Hi Gordon,
That sounds good. My first thought was that if I have to break up the logic I'd
end up with:
BroadcastFunction1 --> AsyncFunction --> BroadcastFunction2
...with Broadcast1 & BroadcastFunction2 needing the same broadcast state, and
that state could change while an item is being processed through the chain. But
I could leave a marker to do the call like you suggested and have a placeholder
for the result and that might do the trick.
Thanks again for the suggestion! Below is a sudo-code example of how I think
I'll be able to get it to work in case it's helpful for anyone else.
Cheers,
John.
Function:
processElement(item) { //BroadcastFunction
if (broadcastState.checkInventoryLevel) {
long inventoryLevel = http://get_the_inventory_level(item.id) // Zz
if (item.inventory < X) {
ctx.output("reorder-outputTag", item)
}
}
...
item.status = "checked";
collect(item);
}
---> broken down in to functions A, B & C
FunctionA:
processElement(item) { //BroadcastFunction
if (broadcastState.checkInventoryLevel) {
collect(Tuple2(item, True))
item.inventoryLevel = http://get_the_inventory(item.id)
} else {
collect(Tuple2(item, False))
}
FunctionB:
asyncInvoke(Tuple2) { //AsyncFunction
if (needsInventory)
item.inventoryLevel = http://get_the_inventory(item.id)
}
collect(item);
FunctionC:
processElement(item) { //FlatMapFunction
if (item.inventory != null && item.inventory < X) {
ctx.output("reorder-outputTag", item)
}
item.status = "checked";
collect(item);
}
From: Tzu-Li (Gordon) Tai
Sent: Tuesday 17 March 2020 10:05
To: user@flink.apache.org
Subject: Re: RichAsyncFunction + BroadcastProcessFunction
Hi John,
Have you considered letting the BroadcastProcessFunction output events that
indicate extra external HTTP requests needs to be performed, and have them
consumed by a downstream async IO operator to complete the HTTP request?
That could work depending on what exactly you need to do in your specific
case.
Cheers,
Gordon
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/