AFAIK there are currently no other sources in Flink that can treat "other
sources" / "destination" as data. Most complete generic work on this topic
that I'm aware of are Splittable DoFn based IOs in Apache Beam.

I think the best module for the contribution would be "elasticsearch-base",
because this could be easily reused for all ES versions that we currently
support.

Best,
D.

On Wed, Aug 25, 2021 at 4:58 PM Rion Williams <rionmons...@gmail.com> wrote:

> Hi David,
>
> That was perfect and it looks like this is working as I'd expected. I put
> together some larger integration tests for my specific use-case (multiple
> Elasticsearch clusters running in TestContainers) and verified that
> messages were being routed dynamically to the appropriate sinks. I forked
> the Flink repo last night and was trying to figure out the best place to
> start adding these classes in (I noticed that there were three separate ES
> packages targeting 5/6/7 respectively). I was going to try to start
> fleshing the initial implementation for this, but wanted to make sure that
> I was starting in the right place.
>
> Additionally, do you know of anything that might be similar to this even
> within other sinks? Just trying to think of something to model this after.
> Once I get things started, I'll spin up a JIRA issue for it and go from
> there.
>
> Thanks so much for your help!
>
> Rion
>
> On Tue, Aug 24, 2021 at 1:45 AM David Morávek <d...@apache.org> wrote:
>
>> Hi Rion,
>>
>> you just need to call *sink.setRuntimeContext(getRuntimeContext())*
>> before opening the child sink. Please see *AbstractRichFunction* [1]
>> (that EleasticsearchSink extends) for more details.
>>
>> One more note, instead of starting with integration test, I'd recommend
>> writing a unit test using *operator test harness* [2] first. This should
>> help you to discover / debug many issues upfront. You can use
>> *ElasticsearchSinkBaseTest* [3] as an example.
>>
>> [1]
>> https://github.com/apache/flink/blob/release-1.13.2/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java#L52
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators
>> [3]
>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
>>
>> Best,
>> D.
>>
>> On Tue, Aug 24, 2021 at 12:03 AM Rion Williams <rionmons...@gmail.com>
>> wrote:
>>
>>> Hi David,
>>>
>>> Thanks again for the response, I believe that I'm getting pretty close
>>> for at least a POC-level implementation of this. Currently, I'm working
>>> with JsonObject instances throughout the pipeline, so I wanted to try this
>>> out and simply stored the routing information within the element itself for
>>> simplicity's sake right now, so it has a shape that looks something like
>>> this:
>>>
>>> {
>>>     "route": {
>>>         "hosts": "...",
>>>         "index": "...",
>>>         ...
>>>     },
>>>     "all-other-fields-here"
>>> }
>>>
>>> And I've stripped back several of the layers of the routers (since I
>>> already have all of the information in the element at that point). I tried
>>> using something like this:
>>>
>>> class DynamicElasticsearchSink: RichSinkFunction<JsonObject>(), 
>>> CheckpointedFunction {
>>>     private val sinkRoutes: MutableMap<String, 
>>> ElasticsearchSink<JsonObject>> = ConcurrentHashMap()
>>>     private lateinit var configuration: Configuration
>>>
>>>     override fun open(parameters: Configuration) {
>>>         configuration = parameters
>>>     }
>>>
>>>     override fun invoke(element: JsonObject, context: SinkFunction.Context) 
>>> {
>>>         val route = getHost(element)
>>>         // Check if we already have a router for this cluster
>>>         var sink = sinkRoutes[route]
>>>         if (sink == null) {
>>>             // If not, create one
>>>             sink = buildSinkFromRoute(element)
>>>             sink.open(configuration)
>>>             sinkRoutes[route] = sink
>>>         }
>>>
>>>         sink.invoke(element, context)
>>>     }
>>>
>>>     override fun initializeState(context: FunctionInitializationContext) {
>>>         // No-op.
>>>     }
>>>
>>>     override fun snapshotState(context: FunctionSnapshotContext) {
>>>         // This is used only to flush pending writes.
>>>         for (sink in sinkRoutes.values) {
>>>             sink.snapshotState(context)
>>>         }
>>>     }
>>>
>>>     override fun close() {
>>>         for (sink in sinkRoutes.values) {
>>>             sink.close()
>>>         }
>>>     }
>>>
>>>     private fun buildSinkFromRoute(element: JsonObject, ho): 
>>> ElasticsearchSink<JsonObject> {
>>>         val builder = ElasticsearchSink.Builder<JsonObject>(
>>>             buildHostsFromElement(element),
>>>             ElasticsearchRoutingFunction()
>>>         )
>>>
>>>         builder.setBulkFlushMaxActions(1)
>>>
>>>         // TODO: Configure authorization if available
>>> //        builder.setRestClientFactory { restClient ->
>>> //            restClient.setHttpClientConfigCallback(object : 
>>> RestClientBuilder.HttpClientConfigCallback {
>>> //                override fun customizeHttpClient(builder: 
>>> HttpAsyncClientBuilder): HttpAsyncClientBuilder {
>>> //                    // Configure authorization here
>>> //                    val credentialsProvider = 
>>> BasicCredentialsProvider().apply {
>>> //                        setCredentials(
>>> //                            AuthScope.ANY,
>>> //                            UsernamePasswordCredentials("$USERNAME", 
>>> "$PASSWORD")
>>> //                        )
>>> //                    }
>>> //
>>> //                    return 
>>> builder.setDefaultCredentialsProvider(credentialsProvider);
>>> //                }
>>> //            })
>>> //        }
>>>
>>>         return builder.build()
>>>     }
>>>
>>>     private fun buildHostsFromElement(element: JsonObject): List<HttpHost>{
>>>         val transportAddresses = element
>>>             .get("route").asJsonObject
>>>             .get("hosts").asString
>>>
>>>         // If there are multiple, they should be comma-delimited
>>>         val addresses = transportAddresses.split(",")
>>>         return addresses
>>>             .filter { address -> address.isNotEmpty() }
>>>             .map { address ->
>>>                 HttpHost.create(address)
>>>             }
>>>     }
>>>
>>>     private fun getHost(element: JsonObject): String {
>>>         return element
>>>             .get("route").asJsonObject
>>>             .get("hosts").asString
>>>     }
>>>
>>>     private class ElasticsearchRoutingFunction: 
>>> ElasticsearchSinkFunction<JsonObject> {
>>>         override fun process(element: JsonObject, context: RuntimeContext, 
>>> indexer: RequestIndexer) {
>>>             indexer.add(request(element))
>>>         }
>>>
>>>         private fun request(element: JsonObject): IndexRequest {
>>>             // Access routing information
>>>             val index = element
>>>                 .get("route").asJsonObject
>>>                 .get("index").asString
>>>
>>>             // Strip off routing information
>>>             element.remove("route")
>>>
>>>             // Send the request
>>>             return Requests.indexRequest()
>>>                 .index(index)
>>>                 .type("_doc")
>>>                 .source(mapOf(
>>>                     "data" to "$element"
>>>                 ))
>>>         }
>>>     }
>>> }
>>>
>>> After running an integration test, I keep encountering running into the
>>> following error during the invocation of the child sink:
>>>
>>> // The runtime context has not been initialized.
>>> sink.invoke(element, context)
>>>
>>> I can see the underlying sink getting initialized, the open call being
>>> made, etc. however for some reason it looks like there's an issue related
>>> to the context during the invoke call namely* "The runtime context has
>>> not been initialized". *I had assumed this would be alright since the
>>> context for the "wrapper" should have already been initialized, but maybe
>>> there's something that I'm missing.
>>>
>>> Also, please forgive any hastily written or nasty code as this is purely
>>> a POC to see if I could get this to work using a single object. I have the
>>> hopes of cleaning it up and genericizing it after I am confident that it
>>> actually works.
>>>
>>> Thanks so much again,
>>>
>>> Rion
>>>
>>> On Mon, Aug 23, 2021 at 11:12 AM David Morávek <d...@apache.org> wrote:
>>>
>>>> Hi Rion,
>>>>
>>>> Sorry for late reply, I've missed your previous message. Thanks Arvid
>>>> for the reminder <3.
>>>>
>>>> something like a MessageWrapper<ElementT, ConfigurationT> and pass
>>>>> those elements to the sink, which would create the tenant-specific Elastic
>>>>> connection from the ConfigurationT element and handle caching it and
>>>>> then just grab the element and send it on it's way?
>>>>
>>>>
>>>> Yes, this is exactly what I had in mind. There should be almost no
>>>> overhead as sink can be easily chained with your join
>>>> (KeyedCoProcessFunction) function.
>>>>
>>>>    -
>>>>    -
>>>>>
>>>>>    The shape of the elements being evicted from the process function
>>>>>    (Is a simple wrapper with the configuration for the sink enough here? 
>>>>> Do I
>>>>>    need to explicitly initialize the sink within this function? Etc.)
>>>>
>>>>    -
>>>>    - To write an element you need a configuration for the destination
>>>>    and the element itself, so a tuple of *(ElasticConfiguration,
>>>>    Element)* should be enough (that's basically your 
>>>> MessageWrapper<ElementT,
>>>>    ConfigurationT> class).
>>>>
>>>>
>>>>    -
>>>>    -
>>>>>
>>>>>    The actual use of the *DynamicElasticsearchSink* class (Would it
>>>>>    just be something like an 
>>>>> *.addSink(**DynamicElasticSearchSink<**String,
>>>>>    Configuration>())* or perhaps something else entirely?)
>>>>
>>>>    -
>>>>
>>>> I guess it could look something like the snippet below. It would be
>>>> definitely good to play around with the *DynamicElasticSearchSink* API
>>>> and make it more meaningful / user friendly (the gist I've shared was just
>>>> a very rough prototype to showcase the idea).
>>>>
>>>>
>>>>    - static class Destination {
>>>>
>>>>        private final List<HttpHost> httpHosts;
>>>>
>>>>        Destination(List<HttpHost> httpHosts) {
>>>>            this.httpHosts = httpHosts;
>>>>        }
>>>>    }
>>>>    -
>>>>    - final DataStream<Tuple2<Destination, String>> toWrite = ...;
>>>>    toWrite.addSink(
>>>>            new DynamicElasticsearchSink<>(
>>>>                    new SinkRouter<
>>>>                            Tuple2<Destination, String>,
>>>>                            String,
>>>>                            ElasticsearchSink<Tuple2<Destination,
>>>>    String>>>() {
>>>>
>>>>                        @Override
>>>>                        public String getRoute(Tuple2<Destination,
>>>>    String> element) {
>>>>    -                         // Construct a deterministic unique
>>>>    caching key for the destination... (this could be cheaper if you know 
>>>> the
>>>>    data)
>>>>                            return element.f0.httpHosts.stream()
>>>>                                    .map(HttpHost::toHostString)
>>>>                                    .collect(Collectors.joining(","));
>>>>                        }
>>>>
>>>>                        @Override
>>>>                        public ElasticsearchSink<Tuple2<Destination,
>>>>    String>> createSink(
>>>>                                String cacheKey, Tuple2<Destination,
>>>>    String> element) {
>>>>                            return new ElasticsearchSink.Builder<>(
>>>>                                            element.f0.httpHosts,
>>>>                                            (ElasticsearchSinkFunction<
>>>>
>>>>    Tuple2<Destination, String>>)
>>>>                                                    (el, ctx, indexer)
>>>>    -> {
>>>>                                                        // Construct
>>>>    index request.
>>>>                                                        final
>>>>    IndexRequest request = ...;
>>>>
>>>>    indexer.add(request);
>>>>                                                    })
>>>>                                    .build();
>>>>                        }
>>>>                    }));
>>>>
>>>>
>>>> I hope this helps ;)
>>>>
>>>> Best,
>>>> D.
>>>>
>>>>
>>>> On Mon, Aug 16, 2021 at 5:18 PM Rion Williams <rionmons...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks for this suggestion David, it's extremely helpful.
>>>>>
>>>>> Since this will vary depending on the elements retrieved from a
>>>>> separate stream, I'm guessing something like the following would be
>>>>> roughly the avenue to continue down:
>>>>>
>>>>> fun main(args: Array<String>) {
>>>>>     val parameters = mergeParametersFromProperties(args)
>>>>>     val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>>>>>
>>>>>     // Get the stream for tenant-specific Elastic configurations
>>>>>     val connectionStream = stream
>>>>>         .fromSource(
>>>>>             KafkaSource.of(parameters, listOf("elastic-configs")),
>>>>>             WatermarkStrategy.noWatermarks(),
>>>>>             "elastic-configs"
>>>>>         )
>>>>>
>>>>>     // Get the stream of incoming messages to be routed to Elastic
>>>>>     stream
>>>>>         .fromSource(
>>>>>             KafkaSource.of(parameters, listOf("messages")),
>>>>>             WatermarkStrategy.noWatermarks(),
>>>>>             "messages"
>>>>>         )
>>>>>         .keyBy { message ->
>>>>>             // Key by the tenant in the message
>>>>>             message.getTenant()
>>>>>         }
>>>>>         .connect(
>>>>>             // Connect the messages stream with the configurations
>>>>>             connectionStream
>>>>>         )
>>>>>         .process(object : KeyedCoProcessFunction<String, String, String, 
>>>>> String>() {
>>>>>             // For this key, we need to store all of the previous 
>>>>> messages in state
>>>>>             // in the case where we don't have a given mapping for this 
>>>>> tenant yet
>>>>>             lateinit var messagesAwaitingConfigState: ListState<String>
>>>>>             lateinit var configState: ValueState<String>
>>>>>
>>>>>             override fun open(parameters: Configuration) {
>>>>>                 super.open(parameters)
>>>>>                 // Initialize the states
>>>>>                 messagesAwaitingConfigState = 
>>>>> runtimeContext.getListState(awaitingStateDesc)
>>>>>                 configState = runtimeContext.getState(configStateDesc)
>>>>>             }
>>>>>
>>>>>             // When an element is received
>>>>>             override fun processElement1(message: String, context: 
>>>>> Context, out: Collector<String>) {
>>>>>                 // Check if we have a mapping
>>>>>                 if (configState.value() == null){
>>>>>                     // We don't have a mapping for this tenant, store 
>>>>> messages until we get it
>>>>>                     messagesAwaitingConfigState.add(message)
>>>>>                 }
>>>>>                 else {
>>>>>                     // Output the record with some indicator of the route?
>>>>>                     out.collect(message)
>>>>>                 }
>>>>>             }
>>>>>
>>>>>             override fun processElement2(config: String, context: 
>>>>> Context, out: Collector<String>) {
>>>>>                 // If this mapping is for this specific tenant, store it 
>>>>> and flush the pending
>>>>>                 // records in state
>>>>>                 if (config.getTenant() == context.currentKey){
>>>>>                     configState.update(config)
>>>>>                     val messagesToFlush = 
>>>>> messagesAwaitingConfigState.get()
>>>>>                     messagesToFlush.forEach { message ->
>>>>>                         out.collect(message)
>>>>>                     }
>>>>>                 }
>>>>>             }
>>>>>
>>>>>             // State descriptors
>>>>>             val awaitingStateDesc = ListStateDescriptor(
>>>>>                 "messages-awaiting-config",
>>>>>                 TypeInformation.of(String::class.java)
>>>>>             )
>>>>>
>>>>>             val configStateDesc = ValueStateDescriptor(
>>>>>                 "elastic-config",
>>>>>                 TypeInformation.of(String::class.java)
>>>>>             )
>>>>>         })
>>>>>
>>>>>     stream.executeAsync("$APPLICATION_NAME-job")
>>>>> }
>>>>>
>>>>> Basically, connect my tenant-specific configuration stream with my
>>>>> incoming messages (keyed by tenant) and buffer them until I have a
>>>>> corresponding configuration (to avoid race-conditions). However, I'm
>>>>> guessing what will happen here is rather than directly outputting the
>>>>> messages from this process function, I'd construct some type of wrapper
>>>>> here with the necessary routing/configuration for the message (obtained 
>>>>> via
>>>>> the configuration stream) along with the element, which might be something
>>>>> like a MessageWrapper<ElementT, ConfigurationT> and pass those
>>>>> elements to the sink, which would create the tenant-specific Elastic
>>>>> connection from the ConfigurationT element and handle caching it and
>>>>> then just grab the element and send it on it's way?
>>>>>
>>>>> Those are really the only bits I'm stuck on at the moment:
>>>>>
>>>>>    1. The shape of the elements being evicted from the process
>>>>>    function (Is a simple wrapper with the configuration for the sink 
>>>>> enough
>>>>>    here? Do I need to explicitly initialize the sink within this function?
>>>>>    Etc.)
>>>>>    2. The actual use of the DynamicElasticsearchSink class (Would it
>>>>>    just be something like an .addSink(DynamicElasticSearchSink<String,
>>>>>    Configuration>()) or perhaps something else entirely?)
>>>>>
>>>>> Thanks again so much for the advice thus far David, it's greatly
>>>>> appreciated.
>>>>>
>>>>> Rion
>>>>>
>>>>> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <d...@apache.org> wrote:
>>>>>
>>>>>> To give you a better idea, in high-level I think could look something
>>>>>> like this
>>>>>> <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8> [1].
>>>>>>
>>>>>> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8
>>>>>>
>>>>>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <rionmons...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi David,
>>>>>>>
>>>>>>> Thanks for your response! I think there are currently quite a few
>>>>>>> unknowns in my end in terms of what a production loads look like but I
>>>>>>> think the number of clusters shouldn’t be too large (and will either 
>>>>>>> rarely
>>>>>>> change or have new entries come in at runtime, but it needs to support
>>>>>>> that).
>>>>>>>
>>>>>>> I think the dynamic approach might be a good route to explore with
>>>>>>> actual changes to the Elasticsearch sink as a longer term option. I’m 
>>>>>>> not
>>>>>>> sure what the dynamic one would look like at the moment though, perhaps
>>>>>>> that’s something you’d be able to advise on?
>>>>>>>
>>>>>>> Given that all the records are keyed for a given tenant and I would
>>>>>>> have the mappings stored in state, is it possible that within the open()
>>>>>>> function for this dynamic route to access the state and initialize the
>>>>>>> client there? Or maybe there’s some other approach (such as grouping by
>>>>>>> clusters and dynamically handling indices)?
>>>>>>>
>>>>>>> I’d be happy to give a shot at making the appropriate changes to the
>>>>>>> sink as well, although I’m far from an Elastic expert. If you point me 
>>>>>>> in
>>>>>>> the right direction, I may be able to help out.
>>>>>>>
>>>>>>> Thanks much!
>>>>>>>
>>>>>>> Rion
>>>>>>>
>>>>>>> On Aug 13, 2021, at 6:52 AM, David Morávek <d...@apache.org> wrote:
>>>>>>>
>>>>>>> 
>>>>>>> Hi Rion,
>>>>>>>
>>>>>>> As you probably already know, for dynamic indices, you can simply
>>>>>>> implement your own ElasticsearchSinkFunction
>>>>>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java>
>>>>>>> [1], where you can create any request that elastic client supports.
>>>>>>>
>>>>>>> The tricky part is how to implement dynamic routing into multiple
>>>>>>> clusters.
>>>>>>> - If the elastic clusters are known upfront (before submitting job),
>>>>>>> you can easily create multiple elastic sinks and prepend them with a 
>>>>>>> simple
>>>>>>> filter (this is basically what split operator does).
>>>>>>> - If you discover elastics clusters at runtime, this would require
>>>>>>> some changes of the current ElasticsearchSink implementation. I think 
>>>>>>> this
>>>>>>> may be actually as simple as introducing something like
>>>>>>> DynamicElasticsearchSink, that could dynamically create and managed 
>>>>>>> "child"
>>>>>>> sinks. This would probably require some thoughts about how to manage
>>>>>>> consumed resources (memory), because number of child sink could be
>>>>>>> potentially unbounded. This could be of course simplified if underlying
>>>>>>> elastic client already supports that, which I'm not aware of. If you'd 
>>>>>>> like
>>>>>>> to take this path, it would definitely be a great contribution to Flink
>>>>>>> (I'm able to provide some guidance).
>>>>>>>
>>>>>>> [1]
>>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
>>>>>>>
>>>>>>> Best,
>>>>>>> D.
>>>>>>>
>>>>>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <rionmons...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi folks,
>>>>>>>>
>>>>>>>> I have a use-case that I wanted to initially pose to the mailing
>>>>>>>> list as I’m not terribly familiar with the Elasticsearch connector to
>>>>>>>> ensure I’m not going down the wrong path trying to accomplish this in 
>>>>>>>> Flink
>>>>>>>> (or if something downstream might be a better option).
>>>>>>>>
>>>>>>>> Basically, I have the following pieces to the puzzle:
>>>>>>>>
>>>>>>>>    - A stream of tenant-specific events
>>>>>>>>    - An HTTP endpoint containing mappings for tenant-specific
>>>>>>>>    Elastic cluster information (as each tenant has its own specific 
>>>>>>>> Elastic
>>>>>>>>    cluster/index)
>>>>>>>>
>>>>>>>> What I’m hoping to accomplish is the following:
>>>>>>>>
>>>>>>>>    1. One stream will periodically poll the HTTP endpoint and
>>>>>>>>    store these cluster mappings in state (keyed by tenant with cluster 
>>>>>>>> info as
>>>>>>>>    the value)
>>>>>>>>    2. The event stream will be keyed by tenant and connected to
>>>>>>>>    the cluster mappings stream.
>>>>>>>>    3. I’ll need to an Elasticsearch sink that can route the
>>>>>>>>    tenant-specific event data to its corresponding cluster/index from 
>>>>>>>> the
>>>>>>>>    mapping source.
>>>>>>>>
>>>>>>>> I know that the existing Elasticsearch sink supports dynamic
>>>>>>>> indices, however I didn’t know if it’s possible to adjust the cluster 
>>>>>>>> like
>>>>>>>> I would need on a per-tenant basis or if there’s a better approach 
>>>>>>>> here?
>>>>>>>>
>>>>>>>> Any advice would be appreciated.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Rion
>>>>>>>>
>>>>>>>

Reply via email to