Re: Guaranteeing event order in a KeyedProcessFunction

2021-05-10 Thread Miguel Araújo
I've realized this is not such a big issue because it's also upper bounded
by the number of watermarks received, and it won't be one per event.

Miguel Araújo  escreveu no dia segunda,
10/05/2021 à(s) 09:39:

> Thanks Dawid, having a look at CepOperator was useful. I implemented
> something with one difference I feel might be important:
>
> I noticed that in the CepOperator the timer is being registered for
> currentWatermark+1, instead of using the event's timestamp. Is there a
> reason for this? I think this implies a quadratic number of triggers, on
> the number of keys with events that arrived after the current watermark.
> For instance, if you have 1000 events per second on different keys (and
> different timestamps), a watermark that is delayed 1 second will fire ~1
> million times. Is this a requirement to the NFA implementation? Would this
> not be a problem?
>
> Thanks, once again.
>
> Dawid Wysakowicz  escreveu no dia segunda,
> 10/05/2021 à(s) 09:13:
>
>> Hey Miguel,
>>
>> I think you could take a look at the CepOperator which does pretty much
>> what you are describing.
>>
>> As for more direct answers for your questions. If you use
>> KeyedProcessFunction it is always scoped to a single Key. There is no way
>> to process events from other keys. If you want to have more control over
>> state and e.g. use PriorityQueue which would be snapshotted on checkpoint
>> you could look into using Operator API. Bare in mind it is a semi-public
>> API. It is very low level and subject to change rather frequently. Another
>> thing to consider is that if you use PriorityQueue instead of e.g. MapState
>> for buffering and ordering events you are constrained by the available
>> memory. We used PriorityQueue in the past in the CepOperator but migrated
>> it to MapState.
>>
>> It is possible that events in downstream operators can become late. It
>> all depends on the timestamp of the events you emit from the "sorting"
>> operator. If you emit records with timestamps larger than the Watermark
>> that "triggered" its generation it can become late.
>>
>> Hope those tips could help you a bit.
>>
>> Best,
>>
>> Dawid
>> On 04/05/2021 14:49, Miguel Araújo wrote:
>>
>> Hi Timo,
>>
>> Thanks for your answer. I think I wasn't clear enough in my initial
>> message, so let me give more details.
>>
>> The stream is not keyed by timestamp, it's keyed by a custom field (e.g.,
>> user-id) and then fed into a KeyedProcessFunction. I want to process all
>> events for a given user in order, before sending them downstream for
>> further processing in other operators. I don't want to hold events longer
>> than needed, hence using the watermark to signal which events can be
>> processed.
>> I don't think your suggestion of using a ListState would work, because we
>> would effectively have one list per user. That would imply (among other
>> things) that an event could only be processed when a new event for the same
>> user arrives, which would not only imply a (potentially) huge latency, but
>> also data leakage. Not to mention that the events being sent could easily
>> be considered late-events to the downstream operators.
>> The idea of keying by timestamp was an "evolution" of the ListState
>> suggestion, where events-to-be-later-processed would be kept sorted in the
>> map (which is what would be keyed by timestamp). We could iterate the map
>> to process the events, instead of fetching the full list and sorting it to
>> process the events in order. I don't think this solves any of the problems
>> mentioned above, so I think that mentioning it only raised confusion.
>>
>> Regarding global event-time order, that's not really what I'm after. I
>> only need event-time order per key, but I want to process the event as soon
>> as possible, constrained by knowing that it is "safe" to do so because no
>> event with a smaller timestamp for this key is yet to come.
>>
>> So, rephrasing my question as I'm not sure that part was clear in the
>> initial message, here is the idea:
>> - keeping one priority queue (ordered by timestamp) in each
>> KeyedProcessFunction instance. Therefore, each priority queue would store
>> events for multiple keys.
>> - when an event arrives, we push it to the queue and then process events
>> (updating state and sending them downstream) while their timestamp is lower
>> than the current watermark.
>>
>> The question is:
>> - is this fault tolerant? The priority queue is not state that is managed
>> 

Re: Guaranteeing event order in a KeyedProcessFunction

2021-05-10 Thread Miguel Araújo
Thanks Dawid, having a look at CepOperator was useful. I implemented
something with one difference I feel might be important:

I noticed that in the CepOperator the timer is being registered for
currentWatermark+1, instead of using the event's timestamp. Is there a
reason for this? I think this implies a quadratic number of triggers, on
the number of keys with events that arrived after the current watermark.
For instance, if you have 1000 events per second on different keys (and
different timestamps), a watermark that is delayed 1 second will fire ~1
million times. Is this a requirement to the NFA implementation? Would this
not be a problem?

Thanks, once again.

Dawid Wysakowicz  escreveu no dia segunda,
10/05/2021 à(s) 09:13:

> Hey Miguel,
>
> I think you could take a look at the CepOperator which does pretty much
> what you are describing.
>
> As for more direct answers for your questions. If you use
> KeyedProcessFunction it is always scoped to a single Key. There is no way
> to process events from other keys. If you want to have more control over
> state and e.g. use PriorityQueue which would be snapshotted on checkpoint
> you could look into using Operator API. Bare in mind it is a semi-public
> API. It is very low level and subject to change rather frequently. Another
> thing to consider is that if you use PriorityQueue instead of e.g. MapState
> for buffering and ordering events you are constrained by the available
> memory. We used PriorityQueue in the past in the CepOperator but migrated
> it to MapState.
>
> It is possible that events in downstream operators can become late. It all
> depends on the timestamp of the events you emit from the "sorting"
> operator. If you emit records with timestamps larger than the Watermark
> that "triggered" its generation it can become late.
>
> Hope those tips could help you a bit.
>
> Best,
>
> Dawid
> On 04/05/2021 14:49, Miguel Araújo wrote:
>
> Hi Timo,
>
> Thanks for your answer. I think I wasn't clear enough in my initial
> message, so let me give more details.
>
> The stream is not keyed by timestamp, it's keyed by a custom field (e.g.,
> user-id) and then fed into a KeyedProcessFunction. I want to process all
> events for a given user in order, before sending them downstream for
> further processing in other operators. I don't want to hold events longer
> than needed, hence using the watermark to signal which events can be
> processed.
> I don't think your suggestion of using a ListState would work, because we
> would effectively have one list per user. That would imply (among other
> things) that an event could only be processed when a new event for the same
> user arrives, which would not only imply a (potentially) huge latency, but
> also data leakage. Not to mention that the events being sent could easily
> be considered late-events to the downstream operators.
> The idea of keying by timestamp was an "evolution" of the ListState
> suggestion, where events-to-be-later-processed would be kept sorted in the
> map (which is what would be keyed by timestamp). We could iterate the map
> to process the events, instead of fetching the full list and sorting it to
> process the events in order. I don't think this solves any of the problems
> mentioned above, so I think that mentioning it only raised confusion.
>
> Regarding global event-time order, that's not really what I'm after. I
> only need event-time order per key, but I want to process the event as soon
> as possible, constrained by knowing that it is "safe" to do so because no
> event with a smaller timestamp for this key is yet to come.
>
> So, rephrasing my question as I'm not sure that part was clear in the
> initial message, here is the idea:
> - keeping one priority queue (ordered by timestamp) in each
> KeyedProcessFunction instance. Therefore, each priority queue would store
> events for multiple keys.
> - when an event arrives, we push it to the queue and then process events
> (updating state and sending them downstream) while their timestamp is lower
> than the current watermark.
>
> The question is:
> - is this fault tolerant? The priority queue is not state that is managed
> by flink, but it should be recoverable on replay.
> - is it possible that the events I'm sending downstream become late-events
> for a different operator, for some reason? Will they always be sent before
> the watermark of the event that originated the processElement() call?
> - I would effectively be processing multiple elements (from multiple keys)
> in the same call to processElement(). Is there a way to access the state of
> different keys?
>
> This doesn't feel like the right approach. Is there an operator more
> suitable than a KeyedProces

Re: Guaranteeing event order in a KeyedProcessFunction

2021-05-04 Thread Miguel Araújo
Hi Timo,

Thanks for your answer. I think I wasn't clear enough in my initial
message, so let me give more details.

The stream is not keyed by timestamp, it's keyed by a custom field (e.g.,
user-id) and then fed into a KeyedProcessFunction. I want to process all
events for a given user in order, before sending them downstream for
further processing in other operators. I don't want to hold events longer
than needed, hence using the watermark to signal which events can be
processed.
I don't think your suggestion of using a ListState would work, because we
would effectively have one list per user. That would imply (among other
things) that an event could only be processed when a new event for the same
user arrives, which would not only imply a (potentially) huge latency, but
also data leakage. Not to mention that the events being sent could easily
be considered late-events to the downstream operators.
The idea of keying by timestamp was an "evolution" of the ListState
suggestion, where events-to-be-later-processed would be kept sorted in the
map (which is what would be keyed by timestamp). We could iterate the map
to process the events, instead of fetching the full list and sorting it to
process the events in order. I don't think this solves any of the problems
mentioned above, so I think that mentioning it only raised confusion.

Regarding global event-time order, that's not really what I'm after. I only
need event-time order per key, but I want to process the event as soon as
possible, constrained by knowing that it is "safe" to do so because no
event with a smaller timestamp for this key is yet to come.

So, rephrasing my question as I'm not sure that part was clear in the
initial message, here is the idea:
- keeping one priority queue (ordered by timestamp) in each
KeyedProcessFunction instance. Therefore, each priority queue would store
events for multiple keys.
- when an event arrives, we push it to the queue and then process events
(updating state and sending them downstream) while their timestamp is lower
than the current watermark.

The question is:
- is this fault tolerant? The priority queue is not state that is managed
by flink, but it should be recoverable on replay.
- is it possible that the events I'm sending downstream become late-events
for a different operator, for some reason? Will they always be sent before
the watermark of the event that originated the processElement() call?
- I would effectively be processing multiple elements (from multiple keys)
in the same call to processElement(). Is there a way to access the state of
different keys?

This doesn't feel like the right approach. Is there an operator more
suitable than a KeyedProcessFunction which would allow me to handle the
state for multiple keys in this task manager? Should I register a timer to
trigger on the event timestamp instead? I believe timers trigger on
watermarks, so that could theoretically work, although it feels a little
weird. After all, what I want is just to buffer events so that they are
only processed when the watermark has caught up to them.

Thanks

Timo Walther  escreveu no dia sexta, 30/04/2021 à(s)
12:05:

> Hi Miguel,
>
> your initial idea sounds not too bad but why do you want to key by
> timestamp? Usually, you can simply key your stream by a custom key and
> store the events in a ListState until a watermark comes in.
>
> But if you really want to have some kind of global event-time order, you
> have two choices:
>
> - either a single operator with parallelism 1 that performs the ordering
> - or you send the every event to every operator using the broadcast
> state pattern [1]
>
> It is guaranteed that watermark will reach the downstream operator or
> sink after all events. Watermarks are synchronized across all parallel
> operator instances. You can store a Map uncheckpointed by this means
> that you have to ensure to initialize the map again during recovery.
>
> Regards,
> Timo
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html
>
> On 30.04.21 11:37, Miguel Araújo wrote:
> > Hi everyone,
> >
> > I have a KeyedProcessFunction whose events I would like to process in
> > event-time order.
> > My initial idea was to use a Map keyed by timestamp and, when a new
> > event arrives, iterate over the Map to process events older than the
> > current watermark.
> >
> > The issue is that I obviously can't use a MapState, as my stream is
> > keyed, so the map would be scoped to the current key.
> > Is using a "regular" (i.e., not checkpointed) Map an option, given that
> > its content will be recreated by the replay of the events on a restart?
> > Is it guaranteed that the watermark that triggered the processing of
> > multiple events (and their subsequent push downstream) is not sent
> > downstream before these events themselves?
> >
> > Thanks,
> > Miguel
>
>


Guaranteeing event order in a KeyedProcessFunction

2021-04-30 Thread Miguel Araújo
Hi everyone,

I have a KeyedProcessFunction whose events I would like to process in
event-time order.
My initial idea was to use a Map keyed by timestamp and, when a new event
arrives, iterate over the Map to process events older than the current
watermark.

The issue is that I obviously can't use a MapState, as my stream is keyed,
so the map would be scoped to the current key.
Is using a "regular" (i.e., not checkpointed) Map an option, given that its
content will be recreated by the replay of the events on a restart? Is it
guaranteed that the watermark that triggered the processing of multiple
events (and their subsequent push downstream) is not sent downstream before
these events themselves?

Thanks,
Miguel


Re: Correctly serializing "Number" as state in ProcessFunction

2021-04-23 Thread Miguel Araújo
Thanks for your replies. I agree this is a somewhat general problem.
I posted it here as I was trying to register the valid subclasses in Kryo
but I couldn't get the message to go away, i.e., everything worked
correctly but there was the complaint that GenericType serialization was
being used.

This is how I was registering these types:

env.getConfig.registerKryoType(classOf[java.lang.Integer])
env.getConfig.registerKryoType(classOf[java.lang.Double])

and this is the message I got on every event:

flink-task-manager_1  | 2021-04-23 16:48:29.274 [Processor Function 1
(1/2)#0] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - No
fields were detected for class java.lang.Number so it cannot be used as a
POJO type and must be processed as GenericType. Please read the Flink
documentation on "Data Types & Serialization" for details of the effect on
performance.

In the meanwhile, I've changed my approach to reuse a protobuf type I
already had as part of my input event.

Once again, thanks for your replies because they gave me the right
perspective.



Arvid Heise  escreveu no dia quarta, 21/04/2021 à(s)
18:26:

> Hi Miguel,
>
> as Klemens said this is a rather general problem independent of Flink: How
> do you map Polymorphism in serialization?
>
> Flink doesn't have an answer on its own, as it's discouraged (A Number can
> have arbitrary many subclasses: how do you distinguish them except by
> classname? That adds a ton of overhead.). The easiest solution in your case
> is to convert ints into double.
> Or you use Kryo which dictionary encodes the classes and also limits the
> possible subclasses.
>
> On Tue, Apr 20, 2021 at 11:13 AM Klemens Muthmann <
> klemens.muthm...@cyface.de> wrote:
>
>> Hi,
>>
>> I guess this is more of a Java Problem than a Flink Problem. If you want
>> it quick and dirty you could implement a class such as:
>>
>> public class Value {
>> private boolean isLongSet = false;
>> private long longValue = 0L;
>> private boolean isIntegerSet = false;
>> private int intValue = 0;
>>
>>public Value(final long value) {
>>setLong(value);
>>}
>>
>> public void setLong(final long value) |
>> longValue = value;
>> isLongSet = true;
>>}
>>
>>public long getLong() {
>>if(isLongSet) {
>>return longValue
>>}
>>}
>>
>>// Add same methods for int
>>// to satisfy POJO requirements you will also need to add a
>> no-argument constructor as well as getters and setters for the boolean flags
>> }
>>
>> I guess a cleaner solution would be possible using a custom Kryo
>> serializer as explained here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html
>>
>> Regards
>>   Klemens
>>
>>
>>
>> > Am 20.04.2021 um 10:34 schrieb Miguel Araújo > >:
>> >
>> > Hi everyone,
>> >
>> > I have a ProcessFunction which needs to store different number types
>> for different keys, e.g., some keys need to store an integer while others
>> need to store a double.
>> >
>> > I tried to use java.lang.Number as the type for the ValueState, but I
>> got the expected "No fields were detected for class java.lang.Number so it
>> cannot be used as a POJO type and must be processed as GenericType."
>> >
>> > I have the feeling that this is not the right approach, but the exact
>> type to be stored is only known at runtime which makes things a bit
>> trickier. Is there a way to register these classes correctly, or Is it
>> preferable to use different ValueState's for different types?
>> >
>> > Thanks,
>> > Miguel
>>
>>


Correctly serializing "Number" as state in ProcessFunction

2021-04-20 Thread Miguel Araújo
Hi everyone,

I have a ProcessFunction which needs to store different number types for
different keys, e.g., some keys need to store an integer while others need
to store a double.

I tried to use java.lang.Number as the type for the ValueState, but I got
the expected "No fields were detected for class java.lang.Number so it
cannot be used as a POJO type and must be processed as GenericType."

I have the feeling that this is not the right approach, but the exact type
to be stored is only known at runtime which makes things a bit trickier. Is
there a way to register these classes correctly, or Is it preferable to use
different ValueState's for different types?

Thanks,
Miguel


Re: [Statefun] Dynamic behavior

2021-02-24 Thread Miguel Araújo
Thanks Seth.
I understood Igal's suggestion. My concern was about maintaining a separate
service (outside flink/statefun) when this control stream might be an
incremental stream as well (think, rules in fraud detection - although this
is not a fraud detection application, but the example is good). I wouldn't
want to implement fault tolerance, checkpointing, HA, etc. myself.
I now see that I wasn't thinking a step ahead - just because it is a
separate service from statefun's point of view, it doesn't mean it can't be
implemented in flink if it turns out to be the most appropriate tool.

Thanks for all suggestions, this was definitely helpful.

Miguel

Seth Wiesman  escreveu no dia terça, 23/02/2021 à(s)
17:08:

> I don't think there is anything statefun specific here and I would follow
> Igals advice.
>
> Let's say you have a state value called `Behavior` that describes the
> behavior of an instance. There is a default behavior but any given instance
> may have a customized behavior. What I would do is the following.
>
> Create a state in the TransactionManager called `behavior` that stores the
> instance's customized behavior if it exists. When a transaction comes in,
> read the behavior state. If it exists (is not None in the case of Python)
> then use that. If not, then fall back to the default instance.
>
> The default instance can be provided one of several ways depending on the
> specifics of your use case:
>
> 1) hard-coded in the function.
> 2) dynamically loaded via a background thread as a global. so long as that
> default is immutable this is safe
> 3) dynamically loaded via the function instance on first use. stateful
> functions have strong support for making async requests so you could simply
> query the behavior for that instance on first use from a 3rd party service.
>
> Seth
>
>
> On Tue, Feb 23, 2021 at 10:55 AM Miguel Araújo 
> wrote:
>
>> Hi Seth,
>>
>> Thanks for your comment. I've seen that repository in the past and it was
>> really helpful to "validate" that this was the way to go.
>> I think my question is not being addressed there though: how could one
>> add dynamic behavior to your TransactionManager? In this case, state that
>> is available to all TransactionManager instances when they receive a
>> message of type Transaction for the first time.
>>
>> Seth Wiesman  escreveu no dia terça, 23/02/2021
>> à(s) 16:02:
>>
>>> Hey Miguel,
>>>
>>> What you are describing is exactly what is implemented in this repo. The
>>> TransactionManager function acts as an orchestrator to work with the other
>>> functions. The repo is structured as an exercise but the full solution
>>> exists on the branch `advanced-solution`.
>>>
>>> https://github.com/ververica/flink-statefun-workshop
>>>
>>> On Tue, Feb 23, 2021 at 8:34 AM Miguel Araújo 
>>> wrote:
>>>
>>>> Another possibility I am considering is handling this in Flink using a
>>>> broadcast and adding all the information needed to the event itself. I'm a
>>>> little concerned about the amount of data that will be serialized and sent
>>>> on every request though, as I'll need to include information about all
>>>> available remote functions, for instance.
>>>>
>>>> Miguel Araújo  escreveu no dia terça, 23/02/2021
>>>> à(s) 09:14:
>>>>
>>>>> Hi Gordon, Igal,
>>>>>
>>>>> Thanks for your replies.
>>>>> PubSub would be a good addition, I have a few scenarios where that
>>>>> would be useful.
>>>>>
>>>>> However, after reading your answers I realized that your proposed
>>>>> solutions (which address the most obvious interpretation of my question) 
>>>>> do
>>>>> not necessarily solve my problem. I should have just stated what it was,
>>>>> instead of trying to propose a solution by discussing broadcast...
>>>>>
>>>>> I'm trying to implement an "orchestrator" function which, given an
>>>>> event, will trigger multiple remote function calls, aggregate their 
>>>>> results
>>>>> and eventually call yet more functions (based on a provided dependency
>>>>> graph). Hence, this orchestrator function has state per event_id and each
>>>>> function instance is short-lived (a couple seconds at most, ideally
>>>>> sub-second). The question then is not about how to modify a long-running
>>>>> function instance (which PubSub would enable), but rather how to have the
>>>>>

Re: [Statefun] Dynamic behavior

2021-02-23 Thread Miguel Araújo
Hi Seth,

Thanks for your comment. I've seen that repository in the past and it was
really helpful to "validate" that this was the way to go.
I think my question is not being addressed there though: how could one add
dynamic behavior to your TransactionManager? In this case, state that is
available to all TransactionManager instances when they receive a message
of type Transaction for the first time.

Seth Wiesman  escreveu no dia terça, 23/02/2021 à(s)
16:02:

> Hey Miguel,
>
> What you are describing is exactly what is implemented in this repo. The
> TransactionManager function acts as an orchestrator to work with the other
> functions. The repo is structured as an exercise but the full solution
> exists on the branch `advanced-solution`.
>
> https://github.com/ververica/flink-statefun-workshop
>
> On Tue, Feb 23, 2021 at 8:34 AM Miguel Araújo  wrote:
>
>> Another possibility I am considering is handling this in Flink using a
>> broadcast and adding all the information needed to the event itself. I'm a
>> little concerned about the amount of data that will be serialized and sent
>> on every request though, as I'll need to include information about all
>> available remote functions, for instance.
>>
>> Miguel Araújo  escreveu no dia terça, 23/02/2021
>> à(s) 09:14:
>>
>>> Hi Gordon, Igal,
>>>
>>> Thanks for your replies.
>>> PubSub would be a good addition, I have a few scenarios where that would
>>> be useful.
>>>
>>> However, after reading your answers I realized that your proposed
>>> solutions (which address the most obvious interpretation of my question) do
>>> not necessarily solve my problem. I should have just stated what it was,
>>> instead of trying to propose a solution by discussing broadcast...
>>>
>>> I'm trying to implement an "orchestrator" function which, given an
>>> event, will trigger multiple remote function calls, aggregate their results
>>> and eventually call yet more functions (based on a provided dependency
>>> graph). Hence, this orchestrator function has state per event_id and each
>>> function instance is short-lived (a couple seconds at most, ideally
>>> sub-second). The question then is not about how to modify a long-running
>>> function instance (which PubSub would enable), but rather how to have the
>>> dependency graph available to new functions.
>>>
>>> Given this, Igal's answer seems promising because we have the
>>> FunctionProvider instantiating a local variable and passing it down on
>>> every instantiation. I'm assuming there is one FunctionProvider per
>>> TaskManager. Is there an easy way to have the FunctionProvider receiving
>>> data coming from a Flink DataStream, or receiving StateFun messages?
>>> Otherwise, I could have it subscribe to a Kafka topic directly.
>>>
>>> I really appreciate your help.
>>>
>>> Miguel
>>>
>>> Igal Shilman  escreveu no dia segunda, 22/02/2021
>>> à(s) 12:09:
>>>
>>>> Hi Miguel,
>>>>
>>>> I think that there are a couple of ways to achieve this, and it really
>>>> depends on your specific use case, and the trade-offs
>>>> that you are willing to accept.
>>>>
>>>> For example, one way to approach this:
>>>> - Suppose you have an external service somewhere that returns a
>>>> representation of the logic to be interpreted by
>>>> your function at runtime (I think that is the scenario you are
>>>> describing)
>>>> - Then, you can write a background task (a thread) that periodically
>>>> queries that service, and keeps in memory the latest version.
>>>> - You can initialize this background task in your FunctionProvider
>>>> implementation, or even in your StatefulModule if you wish.
>>>> - Then, make sure that your dynamic stateful function has an access to
>>>> the latest value fetched by your client (for example via a shared reference
>>>> like a j.u.c.AtomicReference)
>>>> - Then on receive, you can simply get that reference and re-apply your
>>>> rules.
>>>>
>>>> Take a look at [1] for example (it is not exactly the same, but I
>>>> believe that it is close enough)
>>>>
>>>> [1]
>>>> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-async-example/src/main/java/org/apache/flink/statefun/examples/async/Module.java
>>>>
>>>> Good luck,
>>>> Igal.
>&

Re: [Statefun] Dynamic behavior

2021-02-23 Thread Miguel Araújo
Another possibility I am considering is handling this in Flink using a
broadcast and adding all the information needed to the event itself. I'm a
little concerned about the amount of data that will be serialized and sent
on every request though, as I'll need to include information about all
available remote functions, for instance.

Miguel Araújo  escreveu no dia terça, 23/02/2021 à(s)
09:14:

> Hi Gordon, Igal,
>
> Thanks for your replies.
> PubSub would be a good addition, I have a few scenarios where that would
> be useful.
>
> However, after reading your answers I realized that your proposed
> solutions (which address the most obvious interpretation of my question) do
> not necessarily solve my problem. I should have just stated what it was,
> instead of trying to propose a solution by discussing broadcast...
>
> I'm trying to implement an "orchestrator" function which, given an event,
> will trigger multiple remote function calls, aggregate their results and
> eventually call yet more functions (based on a provided dependency graph).
> Hence, this orchestrator function has state per event_id and each function
> instance is short-lived (a couple seconds at most, ideally sub-second). The
> question then is not about how to modify a long-running function instance
> (which PubSub would enable), but rather how to have the dependency graph
> available to new functions.
>
> Given this, Igal's answer seems promising because we have the
> FunctionProvider instantiating a local variable and passing it down on
> every instantiation. I'm assuming there is one FunctionProvider per
> TaskManager. Is there an easy way to have the FunctionProvider receiving
> data coming from a Flink DataStream, or receiving StateFun messages?
> Otherwise, I could have it subscribe to a Kafka topic directly.
>
> I really appreciate your help.
>
> Miguel
>
> Igal Shilman  escreveu no dia segunda, 22/02/2021
> à(s) 12:09:
>
>> Hi Miguel,
>>
>> I think that there are a couple of ways to achieve this, and it really
>> depends on your specific use case, and the trade-offs
>> that you are willing to accept.
>>
>> For example, one way to approach this:
>> - Suppose you have an external service somewhere that returns a
>> representation of the logic to be interpreted by
>> your function at runtime (I think that is the scenario you are describing)
>> - Then, you can write a background task (a thread) that periodically
>> queries that service, and keeps in memory the latest version.
>> - You can initialize this background task in your FunctionProvider
>> implementation, or even in your StatefulModule if you wish.
>> - Then, make sure that your dynamic stateful function has an access to
>> the latest value fetched by your client (for example via a shared reference
>> like a j.u.c.AtomicReference)
>> - Then on receive, you can simply get that reference and re-apply your
>> rules.
>>
>> Take a look at [1] for example (it is not exactly the same, but I believe
>> that it is close enough)
>>
>> [1]
>> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-async-example/src/main/java/org/apache/flink/statefun/examples/async/Module.java
>>
>> Good luck,
>> Igal.
>>
>>
>> On Mon, Feb 22, 2021 at 4:32 AM Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> Hi,
>>>
>>> FWIW, there is this JIRA that is tracking a pubsub / broadcast messaging
>>> primitive in StateFun:
>>> https://issues.apache.org/jira/browse/FLINK-16319
>>>
>>> This is probably what you are looking for. And I do agree, in the case
>>> that the control stream (which updates the application logic) is high
>>> volume, redeploying functions may not work well.
>>>
>>> I don't think there really is a "recommended" way of doing the
>>> "broadcast control stream, join with main stream" pattern with StateFun at
>>> the moment, at least without FLINK-16319.
>>> On the other hand, it could be possible to use stateful functions to
>>> implement a pub-sub model in user space for the time being. I've actually
>>> left some ideas for implementing that in the comments of FLINK-16319.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On Mon, Feb 22, 2021 at 6:38 AM Miguel Araújo 
>>> wrote:
>>>
>>>> Hi everyone,
>>>>
>>>> What is the recommended way of achieving the equivalent of a broadcast
>>>> in Flink when using Stateful Functions?
>>>>
>>>> For instance, assume we are implementing something

Re: [Statefun] Dynamic behavior

2021-02-23 Thread Miguel Araújo
Hi Gordon, Igal,

Thanks for your replies.
PubSub would be a good addition, I have a few scenarios where that would be
useful.

However, after reading your answers I realized that your proposed solutions
(which address the most obvious interpretation of my question) do not
necessarily solve my problem. I should have just stated what it was,
instead of trying to propose a solution by discussing broadcast...

I'm trying to implement an "orchestrator" function which, given an event,
will trigger multiple remote function calls, aggregate their results and
eventually call yet more functions (based on a provided dependency graph).
Hence, this orchestrator function has state per event_id and each function
instance is short-lived (a couple seconds at most, ideally sub-second). The
question then is not about how to modify a long-running function instance
(which PubSub would enable), but rather how to have the dependency graph
available to new functions.

Given this, Igal's answer seems promising because we have the
FunctionProvider instantiating a local variable and passing it down on
every instantiation. I'm assuming there is one FunctionProvider per
TaskManager. Is there an easy way to have the FunctionProvider receiving
data coming from a Flink DataStream, or receiving StateFun messages?
Otherwise, I could have it subscribe to a Kafka topic directly.

I really appreciate your help.

Miguel

Igal Shilman  escreveu no dia segunda, 22/02/2021 à(s)
12:09:

> Hi Miguel,
>
> I think that there are a couple of ways to achieve this, and it really
> depends on your specific use case, and the trade-offs
> that you are willing to accept.
>
> For example, one way to approach this:
> - Suppose you have an external service somewhere that returns a
> representation of the logic to be interpreted by
> your function at runtime (I think that is the scenario you are describing)
> - Then, you can write a background task (a thread) that periodically
> queries that service, and keeps in memory the latest version.
> - You can initialize this background task in your FunctionProvider
> implementation, or even in your StatefulModule if you wish.
> - Then, make sure that your dynamic stateful function has an access to the
> latest value fetched by your client (for example via a shared reference
> like a j.u.c.AtomicReference)
> - Then on receive, you can simply get that reference and re-apply your
> rules.
>
> Take a look at [1] for example (it is not exactly the same, but I believe
> that it is close enough)
>
> [1]
> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-async-example/src/main/java/org/apache/flink/statefun/examples/async/Module.java
>
> Good luck,
> Igal.
>
>
> On Mon, Feb 22, 2021 at 4:32 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi,
>>
>> FWIW, there is this JIRA that is tracking a pubsub / broadcast messaging
>> primitive in StateFun:
>> https://issues.apache.org/jira/browse/FLINK-16319
>>
>> This is probably what you are looking for. And I do agree, in the case
>> that the control stream (which updates the application logic) is high
>> volume, redeploying functions may not work well.
>>
>> I don't think there really is a "recommended" way of doing the "broadcast
>> control stream, join with main stream" pattern with StateFun at the moment,
>> at least without FLINK-16319.
>> On the other hand, it could be possible to use stateful functions to
>> implement a pub-sub model in user space for the time being. I've actually
>> left some ideas for implementing that in the comments of FLINK-16319.
>>
>> Cheers,
>> Gordon
>>
>>
>> On Mon, Feb 22, 2021 at 6:38 AM Miguel Araújo 
>> wrote:
>>
>>> Hi everyone,
>>>
>>> What is the recommended way of achieving the equivalent of a broadcast
>>> in Flink when using Stateful Functions?
>>>
>>> For instance, assume we are implementing something similar to Flink's
>>> demo fraud detection
>>> <https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html> but
>>> in Stateful Functions - how can one dynamically update the application's
>>> logic then?
>>> There was a similar question in this mailing list in the past where it
>>> was recommended moving the dynamic logic to a remote function
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stateful-Functions-ML-model-prediction-tp38286p38302.html>
>>>  so
>>> that one could achieve that by deploying a new container. I think that's
>>> not very realistic as updates might happen with a frequency that's not
>>> compatible with that approach (e.g., sticking to the fraud detection
>>> example, updating fraud detection rules every hour is not unusual), nor
>>> should one be deploying a new container when data (not code) changes.
>>>
>>> Is there a way of, for example, modifying FunctionProviders
>>> <https://ci.apache.org/projects/flink/flink-statefun-docs-master/sdk/java.html#function-providers-and-dependency-injection>
>>> on the fly?
>>>
>>> Thanks,
>>> Miguel
>>>
>>


[Statefun] Dynamic behavior

2021-02-21 Thread Miguel Araújo
Hi everyone,

What is the recommended way of achieving the equivalent of a broadcast in
Flink when using Stateful Functions?

For instance, assume we are implementing something similar to Flink's demo
fraud detection
 but
in Stateful Functions - how can one dynamically update the application's
logic then?
There was a similar question in this mailing list in the past where it
was recommended
moving the dynamic logic to a remote function

so
that one could achieve that by deploying a new container. I think that's
not very realistic as updates might happen with a frequency that's not
compatible with that approach (e.g., sticking to the fraud detection
example, updating fraud detection rules every hour is not unusual), nor
should one be deploying a new container when data (not code) changes.

Is there a way of, for example, modifying FunctionProviders

on the fly?

Thanks,
Miguel