Handling HTTP Requests for Keyed Streams

2021-08-16 Thread Rion Williams
Hi all,

I've been exploring a few different options for storing tenant-specific
configurations within Flink state based on the messages I have flowing
through my job. Initially I had considered creating a source that would
periodically poll an HTTP API and connect that stream to my original event
stream.

However, I realized that this configuration information would basically
never change and thus it doesn't quite make sense to poll so frequently. My
next approach would be to have a function that would be keyed (by tenant)
and storing the configuration for that tenant in state (and issue an HTTP
call when I did not have it). Something like this:

class ConfigurationLookupFunction: KeyedProcessFunction() {
// Tenant specific configuration
private lateinit var httpClient: HttpClient
private lateinit var configuration: ValueState

override fun open(parameters: Configuration) {
super.open(parameters)
httpClient = HttpClient.newHttpClient()
}

override fun processElement(message: JsonObject, context: Context,
out: Collector) {
if (configuration.value() == null){
// Issue a request to the appropriate API to load the configuration
val url =
HttpRequest.newBuilder(URI.create(".../${context.currentKey}")).build()
httpClient.send(..., {
// Store the configuration info within state here
configuration.update(...)
})

out.collect(message)
}
else {
// Get the configuration information and pass it
downstream to be used by the sink
out.collect(message)
}
}
}

I didn't see any support for using the Async I/O functions from a keyed
context, otherwise I'd imagine that would be ideal. The requests themselves
should be very infrequent (initial call per tenant) and I'd imagine after
that the necessary configuration could be pulled/stored within the state
for that key.

Is there a good way of handling this that I might be overlooking with an
existing Flink construct or function? I'd love to be able to leverage the
Async I/O connectors as they seem pretty well thought out.

Thanks in advance!

Rion


Re: Handling HTTP Requests for Keyed Streams

2021-08-16 Thread Caizhi Weng
Hi!

As you mentioned that the configuration fetching is very infrequent, why
don't you use a blocking approach to send HTTP requests and receive
responses? This seems like a more reasonable solution to me.

Rion Williams  于2021年8月17日周二 上午4:00写道:

> Hi all,
>
> I've been exploring a few different options for storing tenant-specific
> configurations within Flink state based on the messages I have flowing
> through my job. Initially I had considered creating a source that would
> periodically poll an HTTP API and connect that stream to my original event
> stream.
>
> However, I realized that this configuration information would basically
> never change and thus it doesn't quite make sense to poll so frequently. My
> next approach would be to have a function that would be keyed (by tenant)
> and storing the configuration for that tenant in state (and issue an HTTP
> call when I did not have it). Something like this:
>
> class ConfigurationLookupFunction: KeyedProcessFunction JsonObject>() {
> // Tenant specific configuration
> private lateinit var httpClient: HttpClient
> private lateinit var configuration: ValueState
>
> override fun open(parameters: Configuration) {
> super.open(parameters)
> httpClient = HttpClient.newHttpClient()
> }
>
> override fun processElement(message: JsonObject, context: Context, out: 
> Collector) {
> if (configuration.value() == null){
> // Issue a request to the appropriate API to load the 
> configuration
> val url = 
> HttpRequest.newBuilder(URI.create(".../${context.currentKey}")).build()
> httpClient.send(..., {
> // Store the configuration info within state here
> configuration.update(...)
> })
>
> out.collect(message)
> }
> else {
> // Get the configuration information and pass it downstream to be 
> used by the sink
> out.collect(message)
> }
> }
> }
>
> I didn't see any support for using the Async I/O functions from a keyed
> context, otherwise I'd imagine that would be ideal. The requests themselves
> should be very infrequent (initial call per tenant) and I'd imagine after
> that the necessary configuration could be pulled/stored within the state
> for that key.
>
> Is there a good way of handling this that I might be overlooking with an
> existing Flink construct or function? I'd love to be able to leverage the
> Async I/O connectors as they seem pretty well thought out.
>
> Thanks in advance!
>
> Rion
>
>
>


Re: Handling HTTP Requests for Keyed Streams

2021-08-17 Thread Rion Williams
Hi Caizhi,

I don’t mind the request being synchronous (or not using the Async I/O 
connectors). Assuming I go down that route would this be the appropriate way to 
handle this? Specifically creating an HttpClient and storing the result in 
state and on a keyed stream if the state was empty?

It makes sense to me, just wondering if there are any gotchas or 
recommendations in terms of a client that might support things like retries and 
if this a good pattern to accomplish this.

Thanks,

Rion

> On Aug 16, 2021, at 11:57 PM, Caizhi Weng  wrote:
> 
> 
> Hi!
> 
> As you mentioned that the configuration fetching is very infrequent, why 
> don't you use a blocking approach to send HTTP requests and receive 
> responses? This seems like a more reasonable solution to me.
> 
> Rion Williams  于2021年8月17日周二 上午4:00写道:
>> Hi all,
>> 
>> I've been exploring a few different options for storing tenant-specific 
>> configurations within Flink state based on the messages I have flowing 
>> through my job. Initially I had considered creating a source that would 
>> periodically poll an HTTP API and connect that stream to my original event 
>> stream.
>> 
>> However, I realized that this configuration information would basically 
>> never change and thus it doesn't quite make sense to poll so frequently. My 
>> next approach would be to have a function that would be keyed (by tenant) 
>> and storing the configuration for that tenant in state (and issue an HTTP 
>> call when I did not have it). Something like this:
>> 
>> class ConfigurationLookupFunction: KeyedProcessFunction> JsonObject>() {
>> // Tenant specific configuration
>> private lateinit var httpClient: HttpClient
>> private lateinit var configuration: ValueState
>> 
>> override fun open(parameters: Configuration) {
>> super.open(parameters)
>> httpClient = HttpClient.newHttpClient()
>> }
>> 
>> override fun processElement(message: JsonObject, context: Context, out: 
>> Collector) {
>> if (configuration.value() == null){
>> // Issue a request to the appropriate API to load the 
>> configuration
>> val url = 
>> HttpRequest.newBuilder(URI.create(".../${context.currentKey}")).build()
>> httpClient.send(..., {
>> // Store the configuration info within state here
>> configuration.update(...)
>> })
>> 
>> out.collect(message)
>> }
>> else {
>> // Get the configuration information and pass it downstream to 
>> be used by the sink
>> out.collect(message)
>> }
>> }
>> }
>> I didn't see any support for using the Async I/O functions from a keyed 
>> context, otherwise I'd imagine that would be ideal. The requests themselves 
>> should be very infrequent (initial call per tenant) and I'd imagine after 
>> that the necessary configuration could be pulled/stored within the state for 
>> that key.
>> 
>> Is there a good way of handling this that I might be overlooking with an 
>> existing Flink construct or function? I'd love to be able to leverage the 
>> Async I/O connectors as they seem pretty well thought out.
>> 
>> Thanks in advance!
>> 
>> Rion
>> 
>> 


Re: Handling HTTP Requests for Keyed Streams

2021-08-17 Thread JING ZHANG
Hi Rion,
Your solution is good.

It seems that you need enrich a stream with data queries from external Http
request. There is another solution for reference, just like the mechanism
of lookup join in Flink SQL.
Lookup Join in Flink SQL supports two modes: Async mode and Sync mode.
For each input data from the original source, it lookup keys from dimension
table.
To avoid frequency external I/O, some dimension sources use Cache in memory.
E.g HBase dimension table source would use LRU Cache in memory, it caches
the value for recently used, if the input data hits the query, it could
avoid external I/O; else an external
call would be triggered, and the result value would be cached into LRU
Cache.
E.g Hive dimension table source would load all data into Cache in Memory,
the cache would refresh regularly according to the specified interval.

Hope the information is helpful.

Best,
JING ZHANG


Rion Williams  于2021年8月17日周二 下午9:23写道:

> Hi Caizhi,
>
> I don’t mind the request being synchronous (or not using the Async I/O
> connectors). Assuming I go down that route would this be the appropriate
> way to handle this? Specifically creating an HttpClient and storing the
> result in state and on a keyed stream if the state was empty?
>
> It makes sense to me, just wondering if there are any gotchas or
> recommendations in terms of a client that might support things like retries
> and if this a good pattern to accomplish this.
>
> Thanks,
>
> Rion
>
> On Aug 16, 2021, at 11:57 PM, Caizhi Weng  wrote:
>
> 
> Hi!
>
> As you mentioned that the configuration fetching is very infrequent, why
> don't you use a blocking approach to send HTTP requests and receive
> responses? This seems like a more reasonable solution to me.
>
> Rion Williams  于2021年8月17日周二 上午4:00写道:
>
>> Hi all,
>>
>> I've been exploring a few different options for storing tenant-specific
>> configurations within Flink state based on the messages I have flowing
>> through my job. Initially I had considered creating a source that would
>> periodically poll an HTTP API and connect that stream to my original event
>> stream.
>>
>> However, I realized that this configuration information would basically
>> never change and thus it doesn't quite make sense to poll so frequently. My
>> next approach would be to have a function that would be keyed (by tenant)
>> and storing the configuration for that tenant in state (and issue an HTTP
>> call when I did not have it). Something like this:
>>
>> class ConfigurationLookupFunction: KeyedProcessFunction> JsonObject>() {
>> // Tenant specific configuration
>> private lateinit var httpClient: HttpClient
>> private lateinit var configuration: ValueState
>>
>> override fun open(parameters: Configuration) {
>> super.open(parameters)
>> httpClient = HttpClient.newHttpClient()
>> }
>>
>> override fun processElement(message: JsonObject, context: Context, out: 
>> Collector) {
>> if (configuration.value() == null){
>> // Issue a request to the appropriate API to load the 
>> configuration
>> val url = 
>> HttpRequest.newBuilder(URI.create(".../${context.currentKey}")).build()
>> httpClient.send(..., {
>> // Store the configuration info within state here
>> configuration.update(...)
>> })
>>
>> out.collect(message)
>> }
>> else {
>> // Get the configuration information and pass it downstream to 
>> be used by the sink
>> out.collect(message)
>> }
>> }
>> }
>>
>> I didn't see any support for using the Async I/O functions from a keyed
>> context, otherwise I'd imagine that would be ideal. The requests themselves
>> should be very infrequent (initial call per tenant) and I'd imagine after
>> that the necessary configuration could be pulled/stored within the state
>> for that key.
>>
>> Is there a good way of handling this that I might be overlooking with an
>> existing Flink construct or function? I'd love to be able to leverage the
>> Async I/O connectors as they seem pretty well thought out.
>>
>> Thanks in advance!
>>
>> Rion
>>
>>
>>