Re: RichAsyncFunction + BroadcastProcessFunction

2020-03-17 Thread John Morrow
Hi Gordon,

That sounds good. My first thought was that if I have to break up the logic I'd 
end up with:

BroadcastFunction1 --> AsyncFunction --> BroadcastFunction2

...with Broadcast1 & BroadcastFunction2 needing the same broadcast state, and 
that state could change while an item is being processed through the chain. But 
I could leave a marker to do the call like you suggested and have a placeholder 
for the result and that might do the trick.

Thanks again for the suggestion! Below is a sudo-code example of how I think 
I'll be able to get it to work in case it's helpful for anyone else.

Cheers,
John.



Function:

processElement(item) { //BroadcastFunction

  if (broadcastState.checkInventoryLevel) {
  long inventoryLevel = http://get_the_inventory_level(item.id) // Zz
  if (item.inventory < X) {
ctx.output("reorder-outputTag", item)
  }
  }
...
  item.status = "checked";
  collect(item);
}


 ---> broken down in to functions A, B & C


FunctionA:

processElement(item) { //BroadcastFunction

  if (broadcastState.checkInventoryLevel) {
  collect(Tuple2(item, True))
  item.inventoryLevel = http://get_the_inventory(item.id)
  } else {
  collect(Tuple2(item, False))
  }


FunctionB:

asyncInvoke(Tuple2) { //AsyncFunction

  if (needsInventory)
  item.inventoryLevel = http://get_the_inventory(item.id)
  }
  collect(item);


FunctionC:

processElement(item) { //FlatMapFunction

  if (item.inventory != null && item.inventory < X) {
ctx.output("reorder-outputTag", item)
}
  item.status = "checked";
  collect(item);
}



From: Tzu-Li (Gordon) Tai 
Sent: Tuesday 17 March 2020 10:05
To: user@flink.apache.org 
Subject: Re: RichAsyncFunction + BroadcastProcessFunction

Hi John,

Have you considered letting the BroadcastProcessFunction output events that
indicate extra external HTTP requests needs to be performed, and have them
consumed by a downstream async IO operator to complete the HTTP request?
That could work depending on what exactly you need to do in your specific
case.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: RichAsyncFunction + BroadcastProcessFunction

2020-03-17 Thread Tzu-Li (Gordon) Tai
Hi John,

Have you considered letting the BroadcastProcessFunction output events that
indicate extra external HTTP requests needs to be performed, and have them
consumed by a downstream async IO operator to complete the HTTP request?
That could work depending on what exactly you need to do in your specific
case.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


RichAsyncFunction + BroadcastProcessFunction

2020-03-17 Thread John Morrow
Hi Flink Users,

I have a BroadcastProcessFunction and in the processElement method I sometimes 
need to do some http requests, depending on the broadcast state.

Because I'm doing http requests, I'd prefer the function to be async, like 
RichAsyncFunction.asyncInvoke(), but RichAsyncFunction doesn't support 
broadcast data.

Is there any way to combine the functionality of a RichAsyncFunction + a 
BroadcastProcessFunction?

Thanks!
John.