Hi David,

That was perfect and it looks like this is working as I'd expected. I put
together some larger integration tests for my specific use-case (multiple
Elasticsearch clusters running in TestContainers) and verified that
messages were being routed dynamically to the appropriate sinks. I forked
the Flink repo last night and was trying to figure out the best place to
start adding these classes in (I noticed that there were three separate ES
packages targeting 5/6/7 respectively). I was going to try to start
fleshing the initial implementation for this, but wanted to make sure that
I was starting in the right place.

Additionally, do you know of anything that might be similar to this even
within other sinks? Just trying to think of something to model this after.
Once I get things started, I'll spin up a JIRA issue for it and go from
there.

Thanks so much for your help!

Rion

On Tue, Aug 24, 2021 at 1:45 AM David Morávek <d...@apache.org> wrote:

> Hi Rion,
>
> you just need to call *sink.setRuntimeContext(getRuntimeContext())*
> before opening the child sink. Please see *AbstractRichFunction* [1]
> (that EleasticsearchSink extends) for more details.
>
> One more note, instead of starting with integration test, I'd recommend
> writing a unit test using *operator test harness* [2] first. This should
> help you to discover / debug many issues upfront. You can use
> *ElasticsearchSinkBaseTest* [3] as an example.
>
> [1]
> https://github.com/apache/flink/blob/release-1.13.2/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java#L52
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators
> [3]
> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
>
> Best,
> D.
>
> On Tue, Aug 24, 2021 at 12:03 AM Rion Williams <rionmons...@gmail.com>
> wrote:
>
>> Hi David,
>>
>> Thanks again for the response, I believe that I'm getting pretty close
>> for at least a POC-level implementation of this. Currently, I'm working
>> with JsonObject instances throughout the pipeline, so I wanted to try this
>> out and simply stored the routing information within the element itself for
>> simplicity's sake right now, so it has a shape that looks something like
>> this:
>>
>> {
>>     "route": {
>>         "hosts": "...",
>>         "index": "...",
>>         ...
>>     },
>>     "all-other-fields-here"
>> }
>>
>> And I've stripped back several of the layers of the routers (since I
>> already have all of the information in the element at that point). I tried
>> using something like this:
>>
>> class DynamicElasticsearchSink: RichSinkFunction<JsonObject>(), 
>> CheckpointedFunction {
>>     private val sinkRoutes: MutableMap<String, 
>> ElasticsearchSink<JsonObject>> = ConcurrentHashMap()
>>     private lateinit var configuration: Configuration
>>
>>     override fun open(parameters: Configuration) {
>>         configuration = parameters
>>     }
>>
>>     override fun invoke(element: JsonObject, context: SinkFunction.Context) {
>>         val route = getHost(element)
>>         // Check if we already have a router for this cluster
>>         var sink = sinkRoutes[route]
>>         if (sink == null) {
>>             // If not, create one
>>             sink = buildSinkFromRoute(element)
>>             sink.open(configuration)
>>             sinkRoutes[route] = sink
>>         }
>>
>>         sink.invoke(element, context)
>>     }
>>
>>     override fun initializeState(context: FunctionInitializationContext) {
>>         // No-op.
>>     }
>>
>>     override fun snapshotState(context: FunctionSnapshotContext) {
>>         // This is used only to flush pending writes.
>>         for (sink in sinkRoutes.values) {
>>             sink.snapshotState(context)
>>         }
>>     }
>>
>>     override fun close() {
>>         for (sink in sinkRoutes.values) {
>>             sink.close()
>>         }
>>     }
>>
>>     private fun buildSinkFromRoute(element: JsonObject, ho): 
>> ElasticsearchSink<JsonObject> {
>>         val builder = ElasticsearchSink.Builder<JsonObject>(
>>             buildHostsFromElement(element),
>>             ElasticsearchRoutingFunction()
>>         )
>>
>>         builder.setBulkFlushMaxActions(1)
>>
>>         // TODO: Configure authorization if available
>> //        builder.setRestClientFactory { restClient ->
>> //            restClient.setHttpClientConfigCallback(object : 
>> RestClientBuilder.HttpClientConfigCallback {
>> //                override fun customizeHttpClient(builder: 
>> HttpAsyncClientBuilder): HttpAsyncClientBuilder {
>> //                    // Configure authorization here
>> //                    val credentialsProvider = 
>> BasicCredentialsProvider().apply {
>> //                        setCredentials(
>> //                            AuthScope.ANY,
>> //                            UsernamePasswordCredentials("$USERNAME", 
>> "$PASSWORD")
>> //                        )
>> //                    }
>> //
>> //                    return 
>> builder.setDefaultCredentialsProvider(credentialsProvider);
>> //                }
>> //            })
>> //        }
>>
>>         return builder.build()
>>     }
>>
>>     private fun buildHostsFromElement(element: JsonObject): List<HttpHost>{
>>         val transportAddresses = element
>>             .get("route").asJsonObject
>>             .get("hosts").asString
>>
>>         // If there are multiple, they should be comma-delimited
>>         val addresses = transportAddresses.split(",")
>>         return addresses
>>             .filter { address -> address.isNotEmpty() }
>>             .map { address ->
>>                 HttpHost.create(address)
>>             }
>>     }
>>
>>     private fun getHost(element: JsonObject): String {
>>         return element
>>             .get("route").asJsonObject
>>             .get("hosts").asString
>>     }
>>
>>     private class ElasticsearchRoutingFunction: 
>> ElasticsearchSinkFunction<JsonObject> {
>>         override fun process(element: JsonObject, context: RuntimeContext, 
>> indexer: RequestIndexer) {
>>             indexer.add(request(element))
>>         }
>>
>>         private fun request(element: JsonObject): IndexRequest {
>>             // Access routing information
>>             val index = element
>>                 .get("route").asJsonObject
>>                 .get("index").asString
>>
>>             // Strip off routing information
>>             element.remove("route")
>>
>>             // Send the request
>>             return Requests.indexRequest()
>>                 .index(index)
>>                 .type("_doc")
>>                 .source(mapOf(
>>                     "data" to "$element"
>>                 ))
>>         }
>>     }
>> }
>>
>> After running an integration test, I keep encountering running into the
>> following error during the invocation of the child sink:
>>
>> // The runtime context has not been initialized.
>> sink.invoke(element, context)
>>
>> I can see the underlying sink getting initialized, the open call being
>> made, etc. however for some reason it looks like there's an issue related
>> to the context during the invoke call namely* "The runtime context has
>> not been initialized". *I had assumed this would be alright since the
>> context for the "wrapper" should have already been initialized, but maybe
>> there's something that I'm missing.
>>
>> Also, please forgive any hastily written or nasty code as this is purely
>> a POC to see if I could get this to work using a single object. I have the
>> hopes of cleaning it up and genericizing it after I am confident that it
>> actually works.
>>
>> Thanks so much again,
>>
>> Rion
>>
>> On Mon, Aug 23, 2021 at 11:12 AM David Morávek <d...@apache.org> wrote:
>>
>>> Hi Rion,
>>>
>>> Sorry for late reply, I've missed your previous message. Thanks Arvid
>>> for the reminder <3.
>>>
>>> something like a MessageWrapper<ElementT, ConfigurationT> and pass
>>>> those elements to the sink, which would create the tenant-specific Elastic
>>>> connection from the ConfigurationT element and handle caching it and
>>>> then just grab the element and send it on it's way?
>>>
>>>
>>> Yes, this is exactly what I had in mind. There should be almost no
>>> overhead as sink can be easily chained with your join
>>> (KeyedCoProcessFunction) function.
>>>
>>>    -
>>>    -
>>>>
>>>>    The shape of the elements being evicted from the process function
>>>>    (Is a simple wrapper with the configuration for the sink enough here? 
>>>> Do I
>>>>    need to explicitly initialize the sink within this function? Etc.)
>>>
>>>    -
>>>    - To write an element you need a configuration for the destination
>>>    and the element itself, so a tuple of *(ElasticConfiguration,
>>>    Element)* should be enough (that's basically your 
>>> MessageWrapper<ElementT,
>>>    ConfigurationT> class).
>>>
>>>
>>>    -
>>>    -
>>>>
>>>>    The actual use of the *DynamicElasticsearchSink* class (Would it
>>>>    just be something like an *.addSink(**DynamicElasticSearchSink<**String,
>>>>    Configuration>())* or perhaps something else entirely?)
>>>
>>>    -
>>>
>>> I guess it could look something like the snippet below. It would be
>>> definitely good to play around with the *DynamicElasticSearchSink* API
>>> and make it more meaningful / user friendly (the gist I've shared was just
>>> a very rough prototype to showcase the idea).
>>>
>>>
>>>    - static class Destination {
>>>
>>>        private final List<HttpHost> httpHosts;
>>>
>>>        Destination(List<HttpHost> httpHosts) {
>>>            this.httpHosts = httpHosts;
>>>        }
>>>    }
>>>    -
>>>    - final DataStream<Tuple2<Destination, String>> toWrite = ...;
>>>    toWrite.addSink(
>>>            new DynamicElasticsearchSink<>(
>>>                    new SinkRouter<
>>>                            Tuple2<Destination, String>,
>>>                            String,
>>>                            ElasticsearchSink<Tuple2<Destination,
>>>    String>>>() {
>>>
>>>                        @Override
>>>                        public String getRoute(Tuple2<Destination,
>>>    String> element) {
>>>    -                         // Construct a deterministic unique
>>>    caching key for the destination... (this could be cheaper if you know the
>>>    data)
>>>                            return element.f0.httpHosts.stream()
>>>                                    .map(HttpHost::toHostString)
>>>                                    .collect(Collectors.joining(","));
>>>                        }
>>>
>>>                        @Override
>>>                        public ElasticsearchSink<Tuple2<Destination,
>>>    String>> createSink(
>>>                                String cacheKey, Tuple2<Destination,
>>>    String> element) {
>>>                            return new ElasticsearchSink.Builder<>(
>>>                                            element.f0.httpHosts,
>>>                                            (ElasticsearchSinkFunction<
>>>
>>>    Tuple2<Destination, String>>)
>>>                                                    (el, ctx, indexer)
>>>    -> {
>>>                                                        // Construct
>>>    index request.
>>>                                                        final
>>>    IndexRequest request = ...;
>>>
>>>    indexer.add(request);
>>>                                                    })
>>>                                    .build();
>>>                        }
>>>                    }));
>>>
>>>
>>> I hope this helps ;)
>>>
>>> Best,
>>> D.
>>>
>>>
>>> On Mon, Aug 16, 2021 at 5:18 PM Rion Williams <rionmons...@gmail.com>
>>> wrote:
>>>
>>>> Thanks for this suggestion David, it's extremely helpful.
>>>>
>>>> Since this will vary depending on the elements retrieved from a
>>>> separate stream, I'm guessing something like the following would be
>>>> roughly the avenue to continue down:
>>>>
>>>> fun main(args: Array<String>) {
>>>>     val parameters = mergeParametersFromProperties(args)
>>>>     val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>>>>
>>>>     // Get the stream for tenant-specific Elastic configurations
>>>>     val connectionStream = stream
>>>>         .fromSource(
>>>>             KafkaSource.of(parameters, listOf("elastic-configs")),
>>>>             WatermarkStrategy.noWatermarks(),
>>>>             "elastic-configs"
>>>>         )
>>>>
>>>>     // Get the stream of incoming messages to be routed to Elastic
>>>>     stream
>>>>         .fromSource(
>>>>             KafkaSource.of(parameters, listOf("messages")),
>>>>             WatermarkStrategy.noWatermarks(),
>>>>             "messages"
>>>>         )
>>>>         .keyBy { message ->
>>>>             // Key by the tenant in the message
>>>>             message.getTenant()
>>>>         }
>>>>         .connect(
>>>>             // Connect the messages stream with the configurations
>>>>             connectionStream
>>>>         )
>>>>         .process(object : KeyedCoProcessFunction<String, String, String, 
>>>> String>() {
>>>>             // For this key, we need to store all of the previous messages 
>>>> in state
>>>>             // in the case where we don't have a given mapping for this 
>>>> tenant yet
>>>>             lateinit var messagesAwaitingConfigState: ListState<String>
>>>>             lateinit var configState: ValueState<String>
>>>>
>>>>             override fun open(parameters: Configuration) {
>>>>                 super.open(parameters)
>>>>                 // Initialize the states
>>>>                 messagesAwaitingConfigState = 
>>>> runtimeContext.getListState(awaitingStateDesc)
>>>>                 configState = runtimeContext.getState(configStateDesc)
>>>>             }
>>>>
>>>>             // When an element is received
>>>>             override fun processElement1(message: String, context: 
>>>> Context, out: Collector<String>) {
>>>>                 // Check if we have a mapping
>>>>                 if (configState.value() == null){
>>>>                     // We don't have a mapping for this tenant, store 
>>>> messages until we get it
>>>>                     messagesAwaitingConfigState.add(message)
>>>>                 }
>>>>                 else {
>>>>                     // Output the record with some indicator of the route?
>>>>                     out.collect(message)
>>>>                 }
>>>>             }
>>>>
>>>>             override fun processElement2(config: String, context: Context, 
>>>> out: Collector<String>) {
>>>>                 // If this mapping is for this specific tenant, store it 
>>>> and flush the pending
>>>>                 // records in state
>>>>                 if (config.getTenant() == context.currentKey){
>>>>                     configState.update(config)
>>>>                     val messagesToFlush = messagesAwaitingConfigState.get()
>>>>                     messagesToFlush.forEach { message ->
>>>>                         out.collect(message)
>>>>                     }
>>>>                 }
>>>>             }
>>>>
>>>>             // State descriptors
>>>>             val awaitingStateDesc = ListStateDescriptor(
>>>>                 "messages-awaiting-config",
>>>>                 TypeInformation.of(String::class.java)
>>>>             )
>>>>
>>>>             val configStateDesc = ValueStateDescriptor(
>>>>                 "elastic-config",
>>>>                 TypeInformation.of(String::class.java)
>>>>             )
>>>>         })
>>>>
>>>>     stream.executeAsync("$APPLICATION_NAME-job")
>>>> }
>>>>
>>>> Basically, connect my tenant-specific configuration stream with my
>>>> incoming messages (keyed by tenant) and buffer them until I have a
>>>> corresponding configuration (to avoid race-conditions). However, I'm
>>>> guessing what will happen here is rather than directly outputting the
>>>> messages from this process function, I'd construct some type of wrapper
>>>> here with the necessary routing/configuration for the message (obtained via
>>>> the configuration stream) along with the element, which might be something
>>>> like a MessageWrapper<ElementT, ConfigurationT> and pass those
>>>> elements to the sink, which would create the tenant-specific Elastic
>>>> connection from the ConfigurationT element and handle caching it and
>>>> then just grab the element and send it on it's way?
>>>>
>>>> Those are really the only bits I'm stuck on at the moment:
>>>>
>>>>    1. The shape of the elements being evicted from the process
>>>>    function (Is a simple wrapper with the configuration for the sink enough
>>>>    here? Do I need to explicitly initialize the sink within this function?
>>>>    Etc.)
>>>>    2. The actual use of the DynamicElasticsearchSink class (Would it
>>>>    just be something like an .addSink(DynamicElasticSearchSink<String,
>>>>    Configuration>()) or perhaps something else entirely?)
>>>>
>>>> Thanks again so much for the advice thus far David, it's greatly
>>>> appreciated.
>>>>
>>>> Rion
>>>>
>>>> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <d...@apache.org> wrote:
>>>>
>>>>> To give you a better idea, in high-level I think could look something
>>>>> like this
>>>>> <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8> [1].
>>>>>
>>>>> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8
>>>>>
>>>>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <rionmons...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi David,
>>>>>>
>>>>>> Thanks for your response! I think there are currently quite a few
>>>>>> unknowns in my end in terms of what a production loads look like but I
>>>>>> think the number of clusters shouldn’t be too large (and will either 
>>>>>> rarely
>>>>>> change or have new entries come in at runtime, but it needs to support
>>>>>> that).
>>>>>>
>>>>>> I think the dynamic approach might be a good route to explore with
>>>>>> actual changes to the Elasticsearch sink as a longer term option. I’m not
>>>>>> sure what the dynamic one would look like at the moment though, perhaps
>>>>>> that’s something you’d be able to advise on?
>>>>>>
>>>>>> Given that all the records are keyed for a given tenant and I would
>>>>>> have the mappings stored in state, is it possible that within the open()
>>>>>> function for this dynamic route to access the state and initialize the
>>>>>> client there? Or maybe there’s some other approach (such as grouping by
>>>>>> clusters and dynamically handling indices)?
>>>>>>
>>>>>> I’d be happy to give a shot at making the appropriate changes to the
>>>>>> sink as well, although I’m far from an Elastic expert. If you point me in
>>>>>> the right direction, I may be able to help out.
>>>>>>
>>>>>> Thanks much!
>>>>>>
>>>>>> Rion
>>>>>>
>>>>>> On Aug 13, 2021, at 6:52 AM, David Morávek <d...@apache.org> wrote:
>>>>>>
>>>>>> 
>>>>>> Hi Rion,
>>>>>>
>>>>>> As you probably already know, for dynamic indices, you can simply
>>>>>> implement your own ElasticsearchSinkFunction
>>>>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java>
>>>>>> [1], where you can create any request that elastic client supports.
>>>>>>
>>>>>> The tricky part is how to implement dynamic routing into multiple
>>>>>> clusters.
>>>>>> - If the elastic clusters are known upfront (before submitting job),
>>>>>> you can easily create multiple elastic sinks and prepend them with a 
>>>>>> simple
>>>>>> filter (this is basically what split operator does).
>>>>>> - If you discover elastics clusters at runtime, this would require
>>>>>> some changes of the current ElasticsearchSink implementation. I think 
>>>>>> this
>>>>>> may be actually as simple as introducing something like
>>>>>> DynamicElasticsearchSink, that could dynamically create and managed 
>>>>>> "child"
>>>>>> sinks. This would probably require some thoughts about how to manage
>>>>>> consumed resources (memory), because number of child sink could be
>>>>>> potentially unbounded. This could be of course simplified if underlying
>>>>>> elastic client already supports that, which I'm not aware of. If you'd 
>>>>>> like
>>>>>> to take this path, it would definitely be a great contribution to Flink
>>>>>> (I'm able to provide some guidance).
>>>>>>
>>>>>> [1]
>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
>>>>>>
>>>>>> Best,
>>>>>> D.
>>>>>>
>>>>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <rionmons...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi folks,
>>>>>>>
>>>>>>> I have a use-case that I wanted to initially pose to the mailing
>>>>>>> list as I’m not terribly familiar with the Elasticsearch connector to
>>>>>>> ensure I’m not going down the wrong path trying to accomplish this in 
>>>>>>> Flink
>>>>>>> (or if something downstream might be a better option).
>>>>>>>
>>>>>>> Basically, I have the following pieces to the puzzle:
>>>>>>>
>>>>>>>    - A stream of tenant-specific events
>>>>>>>    - An HTTP endpoint containing mappings for tenant-specific
>>>>>>>    Elastic cluster information (as each tenant has its own specific 
>>>>>>> Elastic
>>>>>>>    cluster/index)
>>>>>>>
>>>>>>> What I’m hoping to accomplish is the following:
>>>>>>>
>>>>>>>    1. One stream will periodically poll the HTTP endpoint and store
>>>>>>>    these cluster mappings in state (keyed by tenant with cluster info 
>>>>>>> as the
>>>>>>>    value)
>>>>>>>    2. The event stream will be keyed by tenant and connected to the
>>>>>>>    cluster mappings stream.
>>>>>>>    3. I’ll need to an Elasticsearch sink that can route the
>>>>>>>    tenant-specific event data to its corresponding cluster/index from 
>>>>>>> the
>>>>>>>    mapping source.
>>>>>>>
>>>>>>> I know that the existing Elasticsearch sink supports dynamic
>>>>>>> indices, however I didn’t know if it’s possible to adjust the cluster 
>>>>>>> like
>>>>>>> I would need on a per-tenant basis or if there’s a better approach here?
>>>>>>>
>>>>>>> Any advice would be appreciated.
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Rion
>>>>>>>
>>>>>>

Reply via email to