Re: Broadcast State + Stateful Operator + Async IO
Thanks a lot for your quick response! Your suggestion however would never work for our use case. Ours is a streaming system that must process 100 thousand messages per second and produce immediate results and it's simply impossible to rerun the job. Our job is a streaming job broken down into various operators with very strict latency requirements (less than 10 seconds at all times). There could be multiple messages for a given entity in quick succession and ordered processing is another strict requirement. Question is how can we best leverage flink's features of stateful stream processing as well as async IO.
Re: Broadcast State + Stateful Operator + Async IO
Hi Vishal If your scenario is to update the data in full every time. One idea is to rerun the job every time. For example, you have an `EnrichWithDatabaseAndWebSerivce` job, which is responsible for reading all data from a data source every time, and then joins the data with DB and Web services. Every time you need to re-enrich you have to start the job again. Also, can you briefly describe what the frequency is? Best, Guowei On Fri, Apr 29, 2022 at 2:20 PM Vishal Surana wrote: > Yes. You have explained my requirements exactly as they are. My operator > will talk to multiple databases and a couple of web services to enrich > incoming input streams. I cannot think of a way to use the async IO > operator. So I thought maybe convert these 7-10 calls into async calls and > chain the Futures together. I believe I have to block once in the end of > the KeyedBroadcastProcessFunction but if there's a way to avoid that also > while also ensuring ordered processing of events, then do let me know. > > On Fri, Apr 29, 2022 at 7:35 AM Guowei Ma wrote: > >> Hi Vishal >> >> I want to understand your needs first. Your requirements are: After a >> stateful operator receives a notification, it needs to traverse all the >> data stored in the operator state, communicate with an external system >> during the traversal process (maybe similar to join?). In order to improve >> the efficiency of this behavior, you want to take an asynchronous >> approach. That is, if you modify the state of different keys, do not block >> each other due to external communication. >> If I understand correctly, according to the existing function of >> KeyedBroadcastProcessFunction, it is really impossible. >> As for whether there are other solutions, it may depend on specific >> scenarios, such as what kind of external system. So could you describe in >> detail what scenario has this requirement, and what are the external >> systems it depends on? >> >> Best, >> Guowei >> >> >> On Fri, Apr 29, 2022 at 12:42 AM Vishal Surana >> wrote: >> >>> Hello, >>> My application has a stateful operator which leverages RocksDB to store >>> a large amount of state. It, along with other operators receive >>> configuration as a broadcast stream (KeyedBroadcastProcessFunction). The >>> operator depends upon another input stream that triggers some communication >>> with external services whose results are then combined to yield the state >>> that gets stored in RocksDB. >>> >>> In order to make the application more efficient, I am going to switch to >>> asynchronous IO but as the result is ultimately going to be a (Scala) >>> Future, I will have to block once to get the result. I was hoping to >>> leverage the Async IO operator but that apparently doesn't support RocksDB >>> based state storage. Am I correct in saying >>> that KeyedBroadcastProcessFunction is the only option I have? If so, then I >>> want to understand how registering a future's callbacks (via onComplete) >>> works with a synchronous operator such as KeyedBroadcastProcessFunction. >>> Will the thread executing the function simply relinquish control to some >>> other subtask while the results of the external services are being awaited? >>> Will the callback eventually be triggered automatically or will I have to >>> explicitly block on the result future like so: Await.result(f, timeout). >>> >>> -- >>> Regards, >>> Vishal >>> >> > > -- > Regards, > Vishal >
Re: Broadcast State + Stateful Operator + Async IO
Yes. You have explained my requirements exactly as they are. My operator will talk to multiple databases and a couple of web services to enrich incoming input streams. I cannot think of a way to use the async IO operator. So I thought maybe convert these 7-10 calls into async calls and chain the Futures together. I believe I have to block once in the end of the KeyedBroadcastProcessFunction but if there's a way to avoid that also while also ensuring ordered processing of events, then do let me know. On Fri, Apr 29, 2022 at 7:35 AM Guowei Ma wrote: > Hi Vishal > > I want to understand your needs first. Your requirements are: After a > stateful operator receives a notification, it needs to traverse all the > data stored in the operator state, communicate with an external system > during the traversal process (maybe similar to join?). In order to improve > the efficiency of this behavior, you want to take an asynchronous > approach. That is, if you modify the state of different keys, do not block > each other due to external communication. > If I understand correctly, according to the existing function of > KeyedBroadcastProcessFunction, it is really impossible. > As for whether there are other solutions, it may depend on specific > scenarios, such as what kind of external system. So could you describe in > detail what scenario has this requirement, and what are the external > systems it depends on? > > Best, > Guowei > > > On Fri, Apr 29, 2022 at 12:42 AM Vishal Surana > wrote: > >> Hello, >> My application has a stateful operator which leverages RocksDB to store a >> large amount of state. It, along with other operators receive configuration >> as a broadcast stream (KeyedBroadcastProcessFunction). The operator depends >> upon another input stream that triggers some communication with external >> services whose results are then combined to yield the state that gets >> stored in RocksDB. >> >> In order to make the application more efficient, I am going to switch to >> asynchronous IO but as the result is ultimately going to be a (Scala) >> Future, I will have to block once to get the result. I was hoping to >> leverage the Async IO operator but that apparently doesn't support RocksDB >> based state storage. Am I correct in saying >> that KeyedBroadcastProcessFunction is the only option I have? If so, then I >> want to understand how registering a future's callbacks (via onComplete) >> works with a synchronous operator such as KeyedBroadcastProcessFunction. >> Will the thread executing the function simply relinquish control to some >> other subtask while the results of the external services are being awaited? >> Will the callback eventually be triggered automatically or will I have to >> explicitly block on the result future like so: Await.result(f, timeout). >> >> -- >> Regards, >> Vishal >> > -- Regards, Vishal
Re: Broadcast State + Stateful Operator + Async IO
Hi Vishal I want to understand your needs first. Your requirements are: After a stateful operator receives a notification, it needs to traverse all the data stored in the operator state, communicate with an external system during the traversal process (maybe similar to join?). In order to improve the efficiency of this behavior, you want to take an asynchronous approach. That is, if you modify the state of different keys, do not block each other due to external communication. If I understand correctly, according to the existing function of KeyedBroadcastProcessFunction, it is really impossible. As for whether there are other solutions, it may depend on specific scenarios, such as what kind of external system. So could you describe in detail what scenario has this requirement, and what are the external systems it depends on? Best, Guowei On Fri, Apr 29, 2022 at 12:42 AM Vishal Surana wrote: > Hello, > My application has a stateful operator which leverages RocksDB to store a > large amount of state. It, along with other operators receive configuration > as a broadcast stream (KeyedBroadcastProcessFunction). The operator depends > upon another input stream that triggers some communication with external > services whose results are then combined to yield the state that gets > stored in RocksDB. > > In order to make the application more efficient, I am going to switch to > asynchronous IO but as the result is ultimately going to be a (Scala) > Future, I will have to block once to get the result. I was hoping to > leverage the Async IO operator but that apparently doesn't support RocksDB > based state storage. Am I correct in saying > that KeyedBroadcastProcessFunction is the only option I have? If so, then I > want to understand how registering a future's callbacks (via onComplete) > works with a synchronous operator such as KeyedBroadcastProcessFunction. > Will the thread executing the function simply relinquish control to some > other subtask while the results of the external services are being awaited? > Will the callback eventually be triggered automatically or will I have to > explicitly block on the result future like so: Await.result(f, timeout). > > -- > Regards, > Vishal >