AFAIK there are currently no other sources in Flink that can treat "other sources" / "destination" as data. Most complete generic work on this topic that I'm aware of are Splittable DoFn based IOs in Apache Beam.
I think the best module for the contribution would be "elasticsearch-base", because this could be easily reused for all ES versions that we currently support. Best, D. On Wed, Aug 25, 2021 at 4:58 PM Rion Williams <rionmons...@gmail.com> wrote: > Hi David, > > That was perfect and it looks like this is working as I'd expected. I put > together some larger integration tests for my specific use-case (multiple > Elasticsearch clusters running in TestContainers) and verified that > messages were being routed dynamically to the appropriate sinks. I forked > the Flink repo last night and was trying to figure out the best place to > start adding these classes in (I noticed that there were three separate ES > packages targeting 5/6/7 respectively). I was going to try to start > fleshing the initial implementation for this, but wanted to make sure that > I was starting in the right place. > > Additionally, do you know of anything that might be similar to this even > within other sinks? Just trying to think of something to model this after. > Once I get things started, I'll spin up a JIRA issue for it and go from > there. > > Thanks so much for your help! > > Rion > > On Tue, Aug 24, 2021 at 1:45 AM David Morávek <d...@apache.org> wrote: > >> Hi Rion, >> >> you just need to call *sink.setRuntimeContext(getRuntimeContext())* >> before opening the child sink. Please see *AbstractRichFunction* [1] >> (that EleasticsearchSink extends) for more details. >> >> One more note, instead of starting with integration test, I'd recommend >> writing a unit test using *operator test harness* [2] first. This should >> help you to discover / debug many issues upfront. You can use >> *ElasticsearchSinkBaseTest* [3] as an example. >> >> [1] >> https://github.com/apache/flink/blob/release-1.13.2/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java#L52 >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators >> [3] >> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java >> >> Best, >> D. >> >> On Tue, Aug 24, 2021 at 12:03 AM Rion Williams <rionmons...@gmail.com> >> wrote: >> >>> Hi David, >>> >>> Thanks again for the response, I believe that I'm getting pretty close >>> for at least a POC-level implementation of this. Currently, I'm working >>> with JsonObject instances throughout the pipeline, so I wanted to try this >>> out and simply stored the routing information within the element itself for >>> simplicity's sake right now, so it has a shape that looks something like >>> this: >>> >>> { >>> "route": { >>> "hosts": "...", >>> "index": "...", >>> ... >>> }, >>> "all-other-fields-here" >>> } >>> >>> And I've stripped back several of the layers of the routers (since I >>> already have all of the information in the element at that point). I tried >>> using something like this: >>> >>> class DynamicElasticsearchSink: RichSinkFunction<JsonObject>(), >>> CheckpointedFunction { >>> private val sinkRoutes: MutableMap<String, >>> ElasticsearchSink<JsonObject>> = ConcurrentHashMap() >>> private lateinit var configuration: Configuration >>> >>> override fun open(parameters: Configuration) { >>> configuration = parameters >>> } >>> >>> override fun invoke(element: JsonObject, context: SinkFunction.Context) >>> { >>> val route = getHost(element) >>> // Check if we already have a router for this cluster >>> var sink = sinkRoutes[route] >>> if (sink == null) { >>> // If not, create one >>> sink = buildSinkFromRoute(element) >>> sink.open(configuration) >>> sinkRoutes[route] = sink >>> } >>> >>> sink.invoke(element, context) >>> } >>> >>> override fun initializeState(context: FunctionInitializationContext) { >>> // No-op. >>> } >>> >>> override fun snapshotState(context: FunctionSnapshotContext) { >>> // This is used only to flush pending writes. >>> for (sink in sinkRoutes.values) { >>> sink.snapshotState(context) >>> } >>> } >>> >>> override fun close() { >>> for (sink in sinkRoutes.values) { >>> sink.close() >>> } >>> } >>> >>> private fun buildSinkFromRoute(element: JsonObject, ho): >>> ElasticsearchSink<JsonObject> { >>> val builder = ElasticsearchSink.Builder<JsonObject>( >>> buildHostsFromElement(element), >>> ElasticsearchRoutingFunction() >>> ) >>> >>> builder.setBulkFlushMaxActions(1) >>> >>> // TODO: Configure authorization if available >>> // builder.setRestClientFactory { restClient -> >>> // restClient.setHttpClientConfigCallback(object : >>> RestClientBuilder.HttpClientConfigCallback { >>> // override fun customizeHttpClient(builder: >>> HttpAsyncClientBuilder): HttpAsyncClientBuilder { >>> // // Configure authorization here >>> // val credentialsProvider = >>> BasicCredentialsProvider().apply { >>> // setCredentials( >>> // AuthScope.ANY, >>> // UsernamePasswordCredentials("$USERNAME", >>> "$PASSWORD") >>> // ) >>> // } >>> // >>> // return >>> builder.setDefaultCredentialsProvider(credentialsProvider); >>> // } >>> // }) >>> // } >>> >>> return builder.build() >>> } >>> >>> private fun buildHostsFromElement(element: JsonObject): List<HttpHost>{ >>> val transportAddresses = element >>> .get("route").asJsonObject >>> .get("hosts").asString >>> >>> // If there are multiple, they should be comma-delimited >>> val addresses = transportAddresses.split(",") >>> return addresses >>> .filter { address -> address.isNotEmpty() } >>> .map { address -> >>> HttpHost.create(address) >>> } >>> } >>> >>> private fun getHost(element: JsonObject): String { >>> return element >>> .get("route").asJsonObject >>> .get("hosts").asString >>> } >>> >>> private class ElasticsearchRoutingFunction: >>> ElasticsearchSinkFunction<JsonObject> { >>> override fun process(element: JsonObject, context: RuntimeContext, >>> indexer: RequestIndexer) { >>> indexer.add(request(element)) >>> } >>> >>> private fun request(element: JsonObject): IndexRequest { >>> // Access routing information >>> val index = element >>> .get("route").asJsonObject >>> .get("index").asString >>> >>> // Strip off routing information >>> element.remove("route") >>> >>> // Send the request >>> return Requests.indexRequest() >>> .index(index) >>> .type("_doc") >>> .source(mapOf( >>> "data" to "$element" >>> )) >>> } >>> } >>> } >>> >>> After running an integration test, I keep encountering running into the >>> following error during the invocation of the child sink: >>> >>> // The runtime context has not been initialized. >>> sink.invoke(element, context) >>> >>> I can see the underlying sink getting initialized, the open call being >>> made, etc. however for some reason it looks like there's an issue related >>> to the context during the invoke call namely* "The runtime context has >>> not been initialized". *I had assumed this would be alright since the >>> context for the "wrapper" should have already been initialized, but maybe >>> there's something that I'm missing. >>> >>> Also, please forgive any hastily written or nasty code as this is purely >>> a POC to see if I could get this to work using a single object. I have the >>> hopes of cleaning it up and genericizing it after I am confident that it >>> actually works. >>> >>> Thanks so much again, >>> >>> Rion >>> >>> On Mon, Aug 23, 2021 at 11:12 AM David Morávek <d...@apache.org> wrote: >>> >>>> Hi Rion, >>>> >>>> Sorry for late reply, I've missed your previous message. Thanks Arvid >>>> for the reminder <3. >>>> >>>> something like a MessageWrapper<ElementT, ConfigurationT> and pass >>>>> those elements to the sink, which would create the tenant-specific Elastic >>>>> connection from the ConfigurationT element and handle caching it and >>>>> then just grab the element and send it on it's way? >>>> >>>> >>>> Yes, this is exactly what I had in mind. There should be almost no >>>> overhead as sink can be easily chained with your join >>>> (KeyedCoProcessFunction) function. >>>> >>>> - >>>> - >>>>> >>>>> The shape of the elements being evicted from the process function >>>>> (Is a simple wrapper with the configuration for the sink enough here? >>>>> Do I >>>>> need to explicitly initialize the sink within this function? Etc.) >>>> >>>> - >>>> - To write an element you need a configuration for the destination >>>> and the element itself, so a tuple of *(ElasticConfiguration, >>>> Element)* should be enough (that's basically your >>>> MessageWrapper<ElementT, >>>> ConfigurationT> class). >>>> >>>> >>>> - >>>> - >>>>> >>>>> The actual use of the *DynamicElasticsearchSink* class (Would it >>>>> just be something like an >>>>> *.addSink(**DynamicElasticSearchSink<**String, >>>>> Configuration>())* or perhaps something else entirely?) >>>> >>>> - >>>> >>>> I guess it could look something like the snippet below. It would be >>>> definitely good to play around with the *DynamicElasticSearchSink* API >>>> and make it more meaningful / user friendly (the gist I've shared was just >>>> a very rough prototype to showcase the idea). >>>> >>>> >>>> - static class Destination { >>>> >>>> private final List<HttpHost> httpHosts; >>>> >>>> Destination(List<HttpHost> httpHosts) { >>>> this.httpHosts = httpHosts; >>>> } >>>> } >>>> - >>>> - final DataStream<Tuple2<Destination, String>> toWrite = ...; >>>> toWrite.addSink( >>>> new DynamicElasticsearchSink<>( >>>> new SinkRouter< >>>> Tuple2<Destination, String>, >>>> String, >>>> ElasticsearchSink<Tuple2<Destination, >>>> String>>>() { >>>> >>>> @Override >>>> public String getRoute(Tuple2<Destination, >>>> String> element) { >>>> - // Construct a deterministic unique >>>> caching key for the destination... (this could be cheaper if you know >>>> the >>>> data) >>>> return element.f0.httpHosts.stream() >>>> .map(HttpHost::toHostString) >>>> .collect(Collectors.joining(",")); >>>> } >>>> >>>> @Override >>>> public ElasticsearchSink<Tuple2<Destination, >>>> String>> createSink( >>>> String cacheKey, Tuple2<Destination, >>>> String> element) { >>>> return new ElasticsearchSink.Builder<>( >>>> element.f0.httpHosts, >>>> (ElasticsearchSinkFunction< >>>> >>>> Tuple2<Destination, String>>) >>>> (el, ctx, indexer) >>>> -> { >>>> // Construct >>>> index request. >>>> final >>>> IndexRequest request = ...; >>>> >>>> indexer.add(request); >>>> }) >>>> .build(); >>>> } >>>> })); >>>> >>>> >>>> I hope this helps ;) >>>> >>>> Best, >>>> D. >>>> >>>> >>>> On Mon, Aug 16, 2021 at 5:18 PM Rion Williams <rionmons...@gmail.com> >>>> wrote: >>>> >>>>> Thanks for this suggestion David, it's extremely helpful. >>>>> >>>>> Since this will vary depending on the elements retrieved from a >>>>> separate stream, I'm guessing something like the following would be >>>>> roughly the avenue to continue down: >>>>> >>>>> fun main(args: Array<String>) { >>>>> val parameters = mergeParametersFromProperties(args) >>>>> val stream = StreamExecutionEnvironment.getExecutionEnvironment() >>>>> >>>>> // Get the stream for tenant-specific Elastic configurations >>>>> val connectionStream = stream >>>>> .fromSource( >>>>> KafkaSource.of(parameters, listOf("elastic-configs")), >>>>> WatermarkStrategy.noWatermarks(), >>>>> "elastic-configs" >>>>> ) >>>>> >>>>> // Get the stream of incoming messages to be routed to Elastic >>>>> stream >>>>> .fromSource( >>>>> KafkaSource.of(parameters, listOf("messages")), >>>>> WatermarkStrategy.noWatermarks(), >>>>> "messages" >>>>> ) >>>>> .keyBy { message -> >>>>> // Key by the tenant in the message >>>>> message.getTenant() >>>>> } >>>>> .connect( >>>>> // Connect the messages stream with the configurations >>>>> connectionStream >>>>> ) >>>>> .process(object : KeyedCoProcessFunction<String, String, String, >>>>> String>() { >>>>> // For this key, we need to store all of the previous >>>>> messages in state >>>>> // in the case where we don't have a given mapping for this >>>>> tenant yet >>>>> lateinit var messagesAwaitingConfigState: ListState<String> >>>>> lateinit var configState: ValueState<String> >>>>> >>>>> override fun open(parameters: Configuration) { >>>>> super.open(parameters) >>>>> // Initialize the states >>>>> messagesAwaitingConfigState = >>>>> runtimeContext.getListState(awaitingStateDesc) >>>>> configState = runtimeContext.getState(configStateDesc) >>>>> } >>>>> >>>>> // When an element is received >>>>> override fun processElement1(message: String, context: >>>>> Context, out: Collector<String>) { >>>>> // Check if we have a mapping >>>>> if (configState.value() == null){ >>>>> // We don't have a mapping for this tenant, store >>>>> messages until we get it >>>>> messagesAwaitingConfigState.add(message) >>>>> } >>>>> else { >>>>> // Output the record with some indicator of the route? >>>>> out.collect(message) >>>>> } >>>>> } >>>>> >>>>> override fun processElement2(config: String, context: >>>>> Context, out: Collector<String>) { >>>>> // If this mapping is for this specific tenant, store it >>>>> and flush the pending >>>>> // records in state >>>>> if (config.getTenant() == context.currentKey){ >>>>> configState.update(config) >>>>> val messagesToFlush = >>>>> messagesAwaitingConfigState.get() >>>>> messagesToFlush.forEach { message -> >>>>> out.collect(message) >>>>> } >>>>> } >>>>> } >>>>> >>>>> // State descriptors >>>>> val awaitingStateDesc = ListStateDescriptor( >>>>> "messages-awaiting-config", >>>>> TypeInformation.of(String::class.java) >>>>> ) >>>>> >>>>> val configStateDesc = ValueStateDescriptor( >>>>> "elastic-config", >>>>> TypeInformation.of(String::class.java) >>>>> ) >>>>> }) >>>>> >>>>> stream.executeAsync("$APPLICATION_NAME-job") >>>>> } >>>>> >>>>> Basically, connect my tenant-specific configuration stream with my >>>>> incoming messages (keyed by tenant) and buffer them until I have a >>>>> corresponding configuration (to avoid race-conditions). However, I'm >>>>> guessing what will happen here is rather than directly outputting the >>>>> messages from this process function, I'd construct some type of wrapper >>>>> here with the necessary routing/configuration for the message (obtained >>>>> via >>>>> the configuration stream) along with the element, which might be something >>>>> like a MessageWrapper<ElementT, ConfigurationT> and pass those >>>>> elements to the sink, which would create the tenant-specific Elastic >>>>> connection from the ConfigurationT element and handle caching it and >>>>> then just grab the element and send it on it's way? >>>>> >>>>> Those are really the only bits I'm stuck on at the moment: >>>>> >>>>> 1. The shape of the elements being evicted from the process >>>>> function (Is a simple wrapper with the configuration for the sink >>>>> enough >>>>> here? Do I need to explicitly initialize the sink within this function? >>>>> Etc.) >>>>> 2. The actual use of the DynamicElasticsearchSink class (Would it >>>>> just be something like an .addSink(DynamicElasticSearchSink<String, >>>>> Configuration>()) or perhaps something else entirely?) >>>>> >>>>> Thanks again so much for the advice thus far David, it's greatly >>>>> appreciated. >>>>> >>>>> Rion >>>>> >>>>> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <d...@apache.org> wrote: >>>>> >>>>>> To give you a better idea, in high-level I think could look something >>>>>> like this >>>>>> <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8> [1]. >>>>>> >>>>>> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8 >>>>>> >>>>>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <rionmons...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi David, >>>>>>> >>>>>>> Thanks for your response! I think there are currently quite a few >>>>>>> unknowns in my end in terms of what a production loads look like but I >>>>>>> think the number of clusters shouldn’t be too large (and will either >>>>>>> rarely >>>>>>> change or have new entries come in at runtime, but it needs to support >>>>>>> that). >>>>>>> >>>>>>> I think the dynamic approach might be a good route to explore with >>>>>>> actual changes to the Elasticsearch sink as a longer term option. I’m >>>>>>> not >>>>>>> sure what the dynamic one would look like at the moment though, perhaps >>>>>>> that’s something you’d be able to advise on? >>>>>>> >>>>>>> Given that all the records are keyed for a given tenant and I would >>>>>>> have the mappings stored in state, is it possible that within the open() >>>>>>> function for this dynamic route to access the state and initialize the >>>>>>> client there? Or maybe there’s some other approach (such as grouping by >>>>>>> clusters and dynamically handling indices)? >>>>>>> >>>>>>> I’d be happy to give a shot at making the appropriate changes to the >>>>>>> sink as well, although I’m far from an Elastic expert. If you point me >>>>>>> in >>>>>>> the right direction, I may be able to help out. >>>>>>> >>>>>>> Thanks much! >>>>>>> >>>>>>> Rion >>>>>>> >>>>>>> On Aug 13, 2021, at 6:52 AM, David Morávek <d...@apache.org> wrote: >>>>>>> >>>>>>> >>>>>>> Hi Rion, >>>>>>> >>>>>>> As you probably already know, for dynamic indices, you can simply >>>>>>> implement your own ElasticsearchSinkFunction >>>>>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java> >>>>>>> [1], where you can create any request that elastic client supports. >>>>>>> >>>>>>> The tricky part is how to implement dynamic routing into multiple >>>>>>> clusters. >>>>>>> - If the elastic clusters are known upfront (before submitting job), >>>>>>> you can easily create multiple elastic sinks and prepend them with a >>>>>>> simple >>>>>>> filter (this is basically what split operator does). >>>>>>> - If you discover elastics clusters at runtime, this would require >>>>>>> some changes of the current ElasticsearchSink implementation. I think >>>>>>> this >>>>>>> may be actually as simple as introducing something like >>>>>>> DynamicElasticsearchSink, that could dynamically create and managed >>>>>>> "child" >>>>>>> sinks. This would probably require some thoughts about how to manage >>>>>>> consumed resources (memory), because number of child sink could be >>>>>>> potentially unbounded. This could be of course simplified if underlying >>>>>>> elastic client already supports that, which I'm not aware of. If you'd >>>>>>> like >>>>>>> to take this path, it would definitely be a great contribution to Flink >>>>>>> (I'm able to provide some guidance). >>>>>>> >>>>>>> [1] >>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java >>>>>>> >>>>>>> Best, >>>>>>> D. >>>>>>> >>>>>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <rionmons...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi folks, >>>>>>>> >>>>>>>> I have a use-case that I wanted to initially pose to the mailing >>>>>>>> list as I’m not terribly familiar with the Elasticsearch connector to >>>>>>>> ensure I’m not going down the wrong path trying to accomplish this in >>>>>>>> Flink >>>>>>>> (or if something downstream might be a better option). >>>>>>>> >>>>>>>> Basically, I have the following pieces to the puzzle: >>>>>>>> >>>>>>>> - A stream of tenant-specific events >>>>>>>> - An HTTP endpoint containing mappings for tenant-specific >>>>>>>> Elastic cluster information (as each tenant has its own specific >>>>>>>> Elastic >>>>>>>> cluster/index) >>>>>>>> >>>>>>>> What I’m hoping to accomplish is the following: >>>>>>>> >>>>>>>> 1. One stream will periodically poll the HTTP endpoint and >>>>>>>> store these cluster mappings in state (keyed by tenant with cluster >>>>>>>> info as >>>>>>>> the value) >>>>>>>> 2. The event stream will be keyed by tenant and connected to >>>>>>>> the cluster mappings stream. >>>>>>>> 3. I’ll need to an Elasticsearch sink that can route the >>>>>>>> tenant-specific event data to its corresponding cluster/index from >>>>>>>> the >>>>>>>> mapping source. >>>>>>>> >>>>>>>> I know that the existing Elasticsearch sink supports dynamic >>>>>>>> indices, however I didn’t know if it’s possible to adjust the cluster >>>>>>>> like >>>>>>>> I would need on a per-tenant basis or if there’s a better approach >>>>>>>> here? >>>>>>>> >>>>>>>> Any advice would be appreciated. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> Rion >>>>>>>> >>>>>>>