Hi!

As you mentioned that the configuration fetching is very infrequent, why
don't you use a blocking approach to send HTTP requests and receive
responses? This seems like a more reasonable solution to me.

Rion Williams <rionmons...@gmail.com> 于2021年8月17日周二 上午4:00写道:

> Hi all,
>
> I've been exploring a few different options for storing tenant-specific
> configurations within Flink state based on the messages I have flowing
> through my job. Initially I had considered creating a source that would
> periodically poll an HTTP API and connect that stream to my original event
> stream.
>
> However, I realized that this configuration information would basically
> never change and thus it doesn't quite make sense to poll so frequently. My
> next approach would be to have a function that would be keyed (by tenant)
> and storing the configuration for that tenant in state (and issue an HTTP
> call when I did not have it). Something like this:
>
> class ConfigurationLookupFunction: KeyedProcessFunction<String, JsonObject, 
> JsonObject>() {
>     // Tenant specific configuration
>     private lateinit var httpClient: HttpClient
>     private lateinit var configuration: ValueState<String>
>
>     override fun open(parameters: Configuration) {
>         super.open(parameters)
>         httpClient = HttpClient.newHttpClient()
>     }
>
>     override fun processElement(message: JsonObject, context: Context, out: 
> Collector<JsonObject>) {
>         if (configuration.value() == null){
>             // Issue a request to the appropriate API to load the 
> configuration
>             val url = 
> HttpRequest.newBuilder(URI.create(".../${context.currentKey}")).build()
>             httpClient.send(..., {
>                 // Store the configuration info within state here
>                 configuration.update(...)
>             })
>
>             out.collect(message)
>         }
>         else {
>             // Get the configuration information and pass it downstream to be 
> used by the sink
>             out.collect(message)
>         }
>     }
> }
>
> I didn't see any support for using the Async I/O functions from a keyed
> context, otherwise I'd imagine that would be ideal. The requests themselves
> should be very infrequent (initial call per tenant) and I'd imagine after
> that the necessary configuration could be pulled/stored within the state
> for that key.
>
> Is there a good way of handling this that I might be overlooking with an
> existing Flink construct or function? I'd love to be able to leverage the
> Async I/O connectors as they seem pretty well thought out.
>
> Thanks in advance!
>
> Rion
>
>
>

Reply via email to