Re: Throttling/effective back-pressure on a Kafka sink
Was any progress ever made on this? We have seen the same issue in the past. What I do remember is, whatever I set max.block.ms to, is when the job crashes. I am going to attempt to reproduce the issue again and will report back. On 3/28/19 3:27 PM, Konstantin Knauf wrote: Hi Marc, the Kafka Producer should be able to create backpressure. Could you try to increase max.block.ms to Long.MAX_VALUE? The exceptions you shared for the failure case don't look like the root causes of the problem. Could you share the full stacktraces or even full logs for this time frame. Feel free to send these logs to me directly, if you don't want to share them on the list. Best, Konstantin On Thu, Mar 28, 2019 at 2:04 PM Marc Roodingwrote: Hi We’ve got a job producing to a Kafka sink. The Kafka topics have a retention of 2 weeks. When doing a complete replay, it seems like Flink isn’t able to back-pressure or throttle the amount of messages going to Kafka, causing the following error: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 8396 record(s) for topic-1:12 ms has passed since batch creation We’re running on Flink 1.7.2 with flink-connector-kafka:1.7.2. Our Kafka cluster is running version 2.1.1. The Kafka producer uses all default settings except from: compression.type = snappy max.in.flight.requests.per.connection = 1 acks = all client.dns.lookup = use_all_dns_ips I tried playing around with the buffer and batch settings, increasing timeouts, but none seem to be what we need. Increasing the delivery.timeout.ms and request.timeout.ms solves the initial error, but causes the Flink job to fail entirely due to: Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator Caused by: java.lang.RuntimeException: Buffer pool is destroyed. My assumption is that the Kafka producer will start blocking since it notices that it can't handle the batches, and Flink eventually runs out of buffers for the operator. What really baffles me is that the backpressure tab shows that everything is OK. The entire job pipeline (which reads from 4 different topics, unions them all and sinks towards 1 topic) pushes all the messages through to the sink stage, resulting in 18 million incoming stage messages, even though Kafka is in no way possible to keep up with this. I searched for others facing the same issue but can't find anything similar. I'm hoping that someone here could guide me in the right direction. Thanks in advance -- Konstantin Knauf | Solutions Architect +49 160 91394525 Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
local disk cleanup after crash
I think that effort is put in to have task managers clean up their folders, however I have noticed that in some cases local folders are not cleaned up and can build up, eventually causing problems due to a full disk. As far as I know this only happens with crashes and other out-of-happy-path scenarios. I am thinking of writing a script to clean up local folders that runs before task-manager starts between restarts in the case of a crash. Assuming local recovery is not configured, what should I delete and what should I leave around? What should I keep if local recovery is configured? Under the "taskmanager.tmp.dirs" I see: blobStore-* flink-dist-cache-* flink-io-* localState/* rocksdb-lib-* Thanks
Re: Flink 1.7 jobmanager tries to lookup taskmanager by its hostname in k8s environment
I dealt with this issue by making the taskmanagers a statefulset. By itself, this doesn't solve the issue, because the taskmanager's `hostname` will not be a resovable FQDN on its own, you need to append the rest of the FQDN for the statefulset's "serviceName" to make it resolvable. I handle this by passing the fully qualified serviceName in as an environment variable and using this to overwriting taskmanager.host in flink.conf in the containers entrypoint script. It's a kludge, but it works. Using statefulsets brings along a lot of "baggage" that may be overkill for taskmanagers. However it does have an unrelated benefit for jobs with large state, in that you can attach dedicated disks in the form of PVCs, rather than using up the host's root disk. On 12/12/18 8:20 AM, Chesnay Schepler wrote: This is a known issue, see https://issues.apache.org/jira/browse/FLINK-11127. I'm not aware of a workaround. On 12.12.2018 14:07, Sergei Poganshev wrote: When I to deploy Flink 1.7 job to Kubernetes, the job itself runs, but upon visiting Flink UI I can see no metrics and there are WARN messages in jobmanager's log: [flink-metrics-14] WARN akka.remote.ReliableDeliverySupervisor flink-metrics-akka.remote.default-remote-dispatcher-3 - Association with remote system [akka.tcp://flink-metrics@adhoc-historical-taskmanager-d4b65dfd4-h5nrx:44491] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@adhoc-historical-taskmanager-d4b65dfd4-h5nrx:44491]] Caused by: [adhoc-historical-taskmanager-d4b65dfd4-h5nrx: Name or service not known] Note: adhoc-historical-taskmanager-d4b65dfd4-h5nrx is a hostname of a pod on which taskmanager is running. So, jobmanager tries to resolve taskmanager's hostname (which probably got to it from taskmanager itself) on a random port. How can this be mitigated?
Re: Problem with metrics inside Kubernetes
See my reply I just posted to the thread "Flink 1.7 jobmanager tries to lookup taskmanager by its hostname in k8s environment". On 1/2/19 11:19 AM, Steven Nelson wrote: I have been working with Flink under Kubernetes recently and I have run into some problems with metrics. I think I have it figured out though. It appears that it's trying to use hostname resolution for the jobmanagers. This causes this error: Association with remote system [akka.tcp://flink@flink-taskmanager-7dffcf7975-vb2pc:42028] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@flink-taskmanager-7dffcf7975-vb2pc:42028]] Caused by: [flink-taskmanager-7dffcf7975-vb2pc] I noticed that if I put hosts file entries on the jobmanager for each of the task managers then everything started working. Is there a way to specify the hostname of taskmanager like you can with the jobmanager? -Steve
Re: long lived standalone job session cluster in kubernetes
Sounds good. Is someone working on this automation today? If not, although my time is tight, I may be able to work on a PR for getting us started down the path Kubernetes native cluster mode. On 12/4/18 5:35 AM, Till Rohrmann wrote: Hi Derek, what I would recommend to use is to trigger the cancel with savepoint command [1]. This will create a savepoint and terminate the job execution. Next you simply need to respawn the job cluster which you provide with the savepoint to resume from. [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#cancel-job-with-savepoint Cheers, Till On Tue, Dec 4, 2018 at 10:30 AM Andrey Zagrebin <and...@data-artisans.com> wrote: Hi Derek, I think your automation steps look good. Recreating deployments should not take long and as you mention, this way you can avoid unpredictable old/new version collisions. Best, Andrey > On 4 Dec 2018, at 10:22, Dawid Wysakowicz <dwysakow...@apache.org> wrote: > > Hi Derek, > > I am not an expert in kubernetes, so I will cc Till, who should be able > to help you more. > > As for the automation for similar process I would recommend having a > look at dA platform[1] which is built on top of kubernetes. > > Best, > > Dawid > > [1] https://data-artisans.com/platform-overview > > On 30/11/2018 02:10, Derek VerLee wrote: >> >> I'm looking at the job cluster mode, it looks great and I and >> considering migrating our jobs off our "legacy" session cluster and >> into Kubernetes. >> >> I do need to ask some questions because I haven't found a lot of >> details in the documentation about how it works yet, and I gave up >> following the the DI around in the code after a while. >> >> Let's say I have a deployment for the job "leader" in HA with ZK, and >> another deployment for the taskmanagers. >> >> I want to upgrade the code or configuration and start from a >> savepoint, in an automated way. >> >> Best I can figure, I can not just update the deployment resources in >> kubernetes and allow the containers to restart in an arbitrary order. >> >> Instead, I expect sequencing is important, something along the lines >> of this: >> >> 1. issue savepoint command on leader >> 2. wait for savepoint >> 3. destroy all leader and taskmanager containers >> 4. deploy new leader, with savepoint url >> 5. deploy new taskmanagers >> >> >> For example, I imagine old taskmanagers (with an old version of my >> job) attaching to the new leader and causing a problem. >> >> Does that sound right, or am I overthinking it? >> >> If not, has anyone tried implementing any automation for this yet? >> >
long lived standalone job session cluster in kubernetes
I'm looking at the job cluster mode, it looks great and I and considering migrating our jobs off our "legacy" session cluster and into Kubernetes. I do need to ask some questions because I haven't found a lot of details in the documentation about how it works yet, and I gave up following the the DI around in the code after a while. Let's say I have a deployment for the job "leader" in HA with ZK, and another deployment for the taskmanagers. I want to upgrade the code or configuration and start from a savepoint, in an automated way. Best I can figure, I can not just update the deployment resources in kubernetes and allow the containers to restart in an arbitrary order. Instead, I expect sequencing is important, something along the lines of this: 1. issue savepoint command on leader 2. wait for savepoint 3. destroy all leader and taskmanager containers 4. deploy new leader, with savepoint url 5. deploy new taskmanagers For example, I imagine old taskmanagers (with an old version of my job) attaching to the new leader and causing a problem. Does that sound right, or am I overthinking it? If not, has anyone tried implementing any automation for this yet?
strange behavior with jobmanager.rpc.address on standalone HA cluster
Two things: 1. It would be beneficial I think to drop a line somewhere in the docs (probably on the production ready checklist as well as the HA page) explaining that enabling zookeeper "highavailability" allows for your jobs to restart automatically after a jobmanager crash or restart. We had spent some cycles trying to implement job restarting and watchdogs (poorly) when I discoverd this from a flink forward presentation on youtube. 2. I seem to have found some odd behavior with HA and then found something that works, but I can't explain why. The clifnotes version is that I took an existing standalone cluster with a single JM and modified with high availability zookeeper mode. The same flink-conf.yaml file is used on all nodes (including JM). This seemed to work fine, I restarted the JM (jm0) and the jobs relaunched when it came back. Easy! Then I deployed a second JM (jm1). Once I modified `masters`, set the HA rpc port range and opened those ports on the firewall for both jobmanagers, but left `jobmanager.rpc.address` the original value, `jm0` on all nodes. I then observed that jm0 worked fine, taskmanagers connected to it and jobs ran. jm1 did not 301 me to jm0 however, it displayed a dashboard (no jobs, no tm). When I stopped jm0, the jobs show up on jm1 as RESTARTING, but the taskmanagers never attach to jm1. In the logs, all nodes, including jm1, had messages about trying to reach jm0. From the documentation and various comments I've seen, `jobmanager.rpc.address` should be ignored. However, commenting it out entirely lead to jobmanagers crashing at boot, setting to `localhost` caused all the taskmanagers to log messages about trying to connect to the jobmanager at localhost. What finally worked was to set the value to the hostname where the flink-conf.yaml was individually, even on the taskmanagers. Does this seem like a bug? Just a hunch, but is there something called an "akka leader" that is different from the jobmanager leader, and could it be somehow defaulting its value over to jobmanager.rpc.address?
Re: intentional back-pressure (or a poor man's side-input)
Thanks for the thoughts Piotr. Seems I have a talent for asking (nearly) the same question as someone else at the same time, and the check-pointing was raised in that thread as well. I guess one way to conceptualize it is that you have is a stream job that has "phases" and transitions between those phases. Maybe there would be a new type of barrier to indicate a change between phases? But now I'm way outside the bounds of hoping to have a "quick and dirty" version of a proper side input implementation. I'm chewing on two new ideas now: Using a "union" stream instead of two streams, and custom source backed by two different sources under the hood, so the "state machine" logic transitioning from initialization to normal operation all happen in the same operator. Or, running a batch or "bounded stream" job first to generate a "cache state", and then launching the main streaming job, which loads this initial state load in open()... not sure how to work out the keying. I'll post back if I get anywhere with these ideas. On 5/3/18 10:49 AM, Piotr Nowojski wrote: Maybe it could work with Flink’s 1.5 credit base flow control. But you would need a way to express state “block one input side of the CoProcessFunction”, pass this information up to the input gate and handle it probably similar to how `org.apache.flink.streaming.runtime.io.CachedBufferBlocker` blocks inputs in case of checkpoint barrier. You can not just block inside `processElement1` method. However I haven’t thought it through and maybe there could be some issues regarding checkpointing (what should happen to checkpoint barriers if you are blocking one side of the input? Should this block checkpoint barrier as well? Should you cancel checkpoint?). Piotrek On 2 May 2018, at 16:31, Derek VerLee <derekver...@gmail.com> wrote: I was just thinking about about letting a coprocessfunction "block" or cause back pressure on one of it's streams? Has this been discussed as an option? Does anyone know a way to effectively accomplish this? I think I could get a lot of mileage out of something like that without needing a full implementation of FLIP-17 (which I would eagerly await still). As mentioned on another thread, one could use a liststate to buffer the main input until the "side input" was sufficiently processed. However the downside of this is that I have no way to control the size of those buffers, whereas with backpressure, the system will naturally take care of it.
Re: Migration to Flip6 Kubernetes
Is anyone actively working on direct Kubernetes support? I'd be excited to see this get in sooner rather than later, I'd be happy to start a PR. On 3/22/18 10:37 AM, Till Rohrmann wrote: Hi Edward and Eron, you're right that there is currently no JobClusterEntrypoint implementation for Kubernetes. How this entrypoint looks like mostly depends on how the job is stored and retrieved. There are multiple ways conceivable: - The entrypoint connects to an external system from which it fetches the JobGraph - The entrypoint contains the serialized JobGraph similar to how the YarnJobClusterEntrypoint works, but this would mean that you have a separate image per job - The entrypoint actually executes a user jar which generates the JobGraph similar to what happens on the client when you submit a job I'm not a Kubernetes expert and therefore I don't know what's the most idiomatic approach to it. But once we have figured this out, it should not be too difficult to write the Kubernetes JobClusterEntrypoint. If we say that Kubernetes is responsible for assigning new resources, then we need a special KubernetesResourceManager which automatically assigns all registered slots to the single JobMaster. This JobMaster would then accept all slots and scale the job to how many slots it got offered. That way we could easily let K8 control the resources. If there is a way to communicate with K8 from within Flink, then we could also implement a mode which is similar to Flink's Yarn integration. The K8RM would then ask for new pods to be started if the JM needs more slots. The per-job mode on K8 won't unfortunately make it into Flink 1.5. But I'm confident that the community will address this issue with Flink 1.6. Cheers, Till On Wed, Mar 21, 2018 at 4:08 PM, Eron Wrightwrote: It would be helpful to expand on how, in job mode, the job graph would be produced. The phrase 'which contains the single job you want to execute' has a few meanings; I believe Till means a serialized job graph, not an executable JAR w/ main method. Till is that correct? On Tue, Mar 20, 2018 at 2:16 AM, Till Rohrmann wrote: Hi Edward, you're right that Flink's Kubernetes documentation has not been updated with respect to Flip-6. This will be one of the tasks during the Flink 1.5 release testing and is still pending. A Flink cluster can be run in two modes: session mode vs per-job mode. The former starts a cluster to which you can submit multiple jobs. The cluster shares the same ResourceManager and a Dispatcher which is responsible for spawning JobMasters which execute a single job each. The latter starts a Flink cluster which is pre-initialized with a JobGraph and only runs this job. Here we also start a ResourceManager and a MiniDispatcher whose job it is to simply start a single JobMaster with the pre-initialized JobGraph. StandaloneSessionClusterEntrypoint is the entrypoint for the session mode. The JobClusterEntrypoint is the entrypoint for the per-job mode. Take a look at YarnJobClusterEntrypoint to see how the entrypoint retrieves the JobGraph from HDFS and then automatically starts executing it. There is no script which directly starts this entrypoint, but the YarnClusterDescriptor uses it when `deployJobCluster` is called.
intentional back-pressure (or a poor man's side-input)
I was just thinking about about letting a coprocessfunction "block" or cause back pressure on one of it's streams? Has this been discussed as an option? Does anyone know a way to effectively accomplish this? I think I could get a lot of mileage out of something like that without needing a full implementation of FLIP-17 (which I would eagerly await still). As mentioned on another thread, one could use a liststate to buffer the main input until the "side input" was sufficiently processed. However the downside of this is that I have no way to control the size of those buffers, whereas with backpressure, the system will naturally take care of it.
Clarification on slots and efficiency
From the docs ( https://ci.apache.org/projects/flink/flink-docs-master/concepts/runtime.html ) By adjusting the number of task slots, users can define how subtasks are isolated from each other. Having one slot per TaskManager means each task group runs in a separate JVM (which can be started in a separate container, for example). Having multiple slots means more subtasks share the same JVM. Tasks in the same JVM share TCP connections (via multiplexing) and heartbeat messages. They may also share data sets and data structures, thus reducing the per-task overhead. Does this mean that if the same task and job is running in two slots on the same task-manager, that messages that happen to move between these slots will do so more efficiently, and avoid serialization overhead?
substantial realistic and idiomatic example applications
We are new to working with Flink and now that we have some basics down, we are looking for some codebases for Flink applications of real-world complexity and size, that could additionally be considered idiomatic and generally good code. Can anyone recommend such a codebase? Thanks, _derek
Re: Streaming : a way to "key by partition id" without redispatching data
I was about to ask this question myself. I find myself re-keying by the same keys repeatedly. I think in principle you could always just roll more work into one window operation with a more complex series of maps/folds/windowfunctions or processfunction. However this doesn't always feel the most clean or convenient, or composible. It would be great if there was a way to just express that you want to keep the same partitions as the last window, or that the new key is 1-to-1 with the previous one. Even more generally, if the new key is "based" off the old key in a way that is one to one or one to many, in either case it may not be necessary to send data over the wire, although in the later case, there is a risk of hot-spotting , I suppose. On 11/10/17 12:01 PM, Gwenhael Pasquiers wrote: I think I finally found a way to “simulate” a Timer thanks to the the processWatermark function of the AbstractStreamOperator. Sorry for the monologue. From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com] Sent: vendredi 10 novembre 2017 16:02 To: 'user@flink.apache.org'Subject: RE: Streaming : a way to "key by partition id" without redispatching data Hello, Finally, even after creating my operator, I still get the error : “Timers can only be used on keyed operators”. Isn’t there any way around this ? A way to “key” my stream without shuffling the data ? From: Gwenhael Pasquiers Sent: vendredi 10 novembre 2017 11:42 To: Gwenhael Pasquiers ; 'user@flink.apache.org' Subject: RE: Streaming : a way to "key by partition id" without redispatching data Maybe you don’t need to bother with that question. I’m currently discovering AbstractStreamOperator, OneInputStreamOperator and Triggerable. That should do it :-) From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com] Sent: jeudi 9 novembre 2017 18:00 To: 'user@flink.apache.org' Subject: Streaming : a way to "key by partition id" without redispatching data Hello, (Flink 1.2.1) For performances reasons I’m trying to reduce the volume of data of my stream as soon as possible by windowing/folding it for 15 minutes before continuing to the rest of the chain that contains keyBys and windows that will transfer data everywhere. Because of the huge volume of data, I want to avoid “moving” the data between partitions as much as possible (not like a naïve KeyBy does). I wanted to create a custom ProcessFunction (using timer and state to fold data for X minutes) in order to fold my data over itself before keying the stream but even ProcessFunction needs a keyed stream… Is there a specific “key” value that would ensure me that my data won’t be moved to another taskmanager (that it’s hashcode will match the partition it is already in) ? I thought about the subtask id but I doubt I’d be that lucky :-) Suggestions · Wouldn’t it be useful to be able to do a “partitionnedKeyBy” that would not move data between nodes, for windowing operations that can be parallelized. o Something like kafka => partitionnedKeyBy(0) => first folding => keyBy(0) => second folding => …. · Finally, aren’t all streams keyed ? Even if they’re keyed by a totally arbitrary partition id until the user chooses its own key, shouldn’t we be able to do a window (not windowAll) or process over any normal Stream’s partition ? B.R. Gwenhaël PASQUIERS
Re: Generate watermarks per key in a KeyedStream
We are contending with the same issue, as it happens. We have dozens, and potentially down the line, may need to deal with thousands of different "time systems" as you put it, and may not be know at compile time or job start time. In a practical sense, how could such a system be composed? On 11/9/17 5:52 AM, Shailesh Jain wrote: Thanks for your reply, Xingcan. On Wed, Nov 8, 2017 at 10:42 PM, Xingcan Cuiwrote: Hi Shailesh, actually, the watermarks are generated per partition, but all of them will be forcibly aligned to the minimum one during processing. That is decided by the semantics of watermark and KeyedStream, i.e., the watermarks belong to a whole stream and a stream is made up of different partitions (one per key). If the physical devices work in different time systems due to delay, the event streams from them should be treated separately. Hope that helps. Best, Xingcan On Wed, Nov 8, 2017 at 11:48 PM, Shailesh Jain wrote: Hi, I'm working on implementing a use case wherein different physical devices are sending events, and due to network/power issues, there can be a delay in receiving events at Flink source. One of the operators within the flink job is the Pattern operator, and there are certain patterns which are time sensitive, so I'm using Event time characteristic. But the problem comes when there are unpredictable delays in events from a particular device(s), which causes those events to be dropped (as I cannot really define a static bound to allow for lateness). Since I'm using a KeyedStream, keyed on the source device ID, is there a way to allow each CEP operator instance (one per key) to progress its time based on the event time in the corresponding stream partition. Or in other words, is there a way to generate watermarks per partition in a KeyedStream? Thanks, Shailesh
Re: Do timestamps and watermarks exist after window evaluation?
This new documentation seems to answer my question directly. It's good to know my intuitions where not wildly off. Also thank you for continuing to improve the already good documentation. Funny enough, some of the other questions I have, where also asked by other users in the last couple days, so I'll just reply on those threads if necessary. Thanks! On 11/9/17 9:32 AM, Aljoscha Krettek wrote: Hi, This new section in the windowing documentation will help answer your question: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#working-with-window-results Please let me know if you have any further questions. :-) Best, Aljoscha On 8. Nov 2017, at 18:54, Derek VerLee <derekver...@gmail.com> wrote: When composing ("chaining") multiple windowing operations on the same stream are watermarks transmitted down stream after window evaluation, and are the records emitted from WindowFunctions given timestamps? Do I need to or should I always assignTimestampsAndWatermarks to the outputsof window evaluations if I want to window again? If automatically assigned, how should I think about them in an event time context? Would the event time of a record resulting from a WindowFunction be the window's end time in the case of a TimeWindow?
Do timestamps and watermarks exist after window evaluation?
When composing ("chaining") multiple windowing operations on the same stream are watermarks transmitted down stream after window evaluation, and are the records emitted from WindowFunctions given timestamps? Do I need to or should I always assignTimestampsAndWatermarks to the outputsof window evaluations if I want to window again? If automatically assigned, how should I think about them in an event time context? Would the event time of a record resulting from a WindowFunction be the window's end time in the case of a TimeWindow?
RocksDB usage for broad slow data
We have a data which is broad and slow; hundreds of thousands of keys, a small number will get an event every few seconds, some get an event every few days, and the vast majority will get an event in a few times an hour. Let's say then that keeping this data in heap for the last couple days is not a challenge. However, one additional challenge is that we can receive late events or corrective data, going back indefinitely, and while infrequent, we need to be able to handle this gracefully. Lets say that the total data-set grows too large to keep in memory economically. One approach of course is a "lambda" type, where sufficiently late events are noted to a side channel, perhaps triggering some batch job to be scheduled. However I'm pondering a simpler solution, I understand that with the RocksDB, the state size can exceed the heap. Would it be a plausible approach in this situation to never purge windows, keeping computation state back to "the beginning", so that an arbitrarily old window (years, potentially), could re-emit a corrected value? Thanks _Derek
Re: Enriching data from external source with cache
Thanks Timo, watching the video now. I did try out the method with iteration in a simple prototype and it works. But you are right, combining it with the other requirements into a single process function has so far resulted in more complexity than I'd like, and it's always nice to leave something easily understood later. On the contribution, I was wondering there was some scary with async and keyed state going on that prevented this from having happened already. I'll have a look and see if I can find where the current non keyed implementation logic resides in the project. Thanks On 10/2/17 6:07 AM, Timo Walther wrote: Hi Derek, maybe the following talk can inspire you, how to do this with joins and async IO: https://www.youtube.com/watch?v=Do7C4UJyWCM (around the 17th min). Basically, you split the stream and wait for an Async IO result in a downstream operator. But I think having a transient guava cache is not a bad idea, since it is only a cache it does not need to be checkpointed and can be recovered at any time. Implementing you own logic in a ProcessFunction is always a way, but might require more implementation effort. Btw. if you feel brave enough, you could also think of contributing a stateful async IO. It should not be too much effort to make this work. Regards, Timo Am 9/29/17 um 8:39 PM schrieb Derek VerLee: My basic problem will sound familiar I think, I need to enrich incoming data using a REST call to an external system for slowly evolving metadata. and some cache based lag is acceptable, so to reduce load on the external system and to process more efficiently, I would like to implement a cache. The cache would by key, and I am already doing a keyBy for the same key in the job. Please correct me if I'm wrong: * Keyed State would be great to store my metadata "cache", Async I/O is ideal for pulling from the external system, but AsyncFunction can not access keyed state ( "Exception: State is not supported in rich async functions.") and operators can not share state between them. This leaves me wondering, since side inputs are not here yet, what the best (and perhaps most idiomatic) way to approach my problem? I'd rather keep changes to existing systems minimal for this iteration and just minimize impact on them during peaks best I can... systemic refactoring and re-architecture will be coming soon (so I'm happy to hear thoughts on that as well). Approaches considered: 1. AsyncFunction with a transient guava cache. Not ideal ... but maybe good enough to get by 2. Using compound message types (oh, if only java had real algebraic data types...) and send cache miss messages from some CacheEnrichmentMapper (keyed) to some AsyncCacheLoader (not keyed) which then backfeeds cache updates to the former via iteration ... i don't know why this couldn't work but it feels like a hot mess unless there is some way I am not thinking of to do it cleanly 3. One user mentioned on a similar thread loading the data in as another DataStream and then using joins, but I'm confused about how this would work, it seems to me that joins happen on windows, windows pertain to (some notion of) time, what would be my notion of time for the slow (maybe years old in some cases) meta-data? 4. Forget about async I/O 5. implement my own "async i/o" in using a process function or similar .. is this a valid pattern
Enriching data from external source with cache
My basic problem will sound familiar I think, I need to enrich incoming data using a REST call to an external system for slowly evolving metadata. and some cache based lag is acceptable, so to reduce load on the external system and to process more efficiently, I would like to implement a cache. The cache would by key, and I am already doing a keyBy for the same key in the job. Please correct me if I'm wrong: * Keyed State would be great to store my metadata "cache", Async I/O is ideal for pulling from the external system, but AsyncFunction can not access keyed state ( "Exception: State is not supported in rich async functions.") and operators can not share state between them. This leaves me wondering, since side inputs are not here yet, what the best (and perhaps most idiomatic) way to approach my problem? I'd rather keep changes to existing systems minimal for this iteration and just minimize impact on them during peaks best I can... systemic refactoring and re-architecture will be coming soon (so I'm happy to hear thoughts on that as well). Approaches considered: 1. AsyncFunction with a transient guava cache. Not ideal ... but maybe good enough to get by 2. Using compound message types (oh, if only java had real algebraic data types...) and send cache miss messages from some CacheEnrichmentMapper (keyed) to some AsyncCacheLoader (not keyed) which then backfeeds cache updates to the former via iteration ... i don't know why this couldn't work but it feels like a hot mess unless there is some way I am not thinking of to do it cleanly 3. One user mentioned on a similar thread loading the data in as another DataStream and then using joins, but I'm confused about how this would work, it seems to me that joins happen on windows, windows pertain to (some notion of) time, what would be my notion of time for the slow (maybe years old in some cases) meta-data? 4. Forget about async I/O 5. implement my own "async i/o" in using a process function or similar .. is this a valid pattern