Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-09-06 Thread Arvid Heise
Hi Rion, Thank you very much for the contribution. We are currently still busy with the 1.14 release but can pick up the review after that. David, if you have capacity, we would be grateful for any help. In general, we plan to port ES from SinkFunction to the new Sink interface in 1.15 but I

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-09-06 Thread David Morávek
Hi Rion, thanks for opening the PR. I'll take a look at it this week. I'd also pull Arvid into this topic to see whether he has any comments. Best, D. On Sat, Sep 4, 2021 at 9:10 PM Rion Williams wrote: > Hi again David et al, > > I managed to push an initial pull request for the

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-09-04 Thread Rion Williams
Hi again David et al, I managed to push an initial pull request for the implementations for the DynamicElasticsearchSink and related ElasticsearchSinkRouter last week and made some minor updates today with regards to the Javadocs (included code

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-26 Thread David Morávek
Hi Rion, personally I'd start with unit test in the base module using a test sink implementation. There is already *DummyElasticsearchSink* that you may be able to reuse (just note that we're trying to get rid of Mockito based tests such as this one). I'm bit unsure that integration test would

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-26 Thread Rion Williams
Just chiming in on this again. I think I have the pieces in place regarding the implementation (both a DynamicElasticsearchSink class and ElasticsearchSinkRouter interface) added to the elasticsearch-base module. I noticed that HttpHost wasn't available within that module/in the tests, so I'd

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-25 Thread Rion Williams
Thanks again David, I've spun up a JIRA issue for the ticket while I work on getting things into the proper state. If someone with the appropriate privileges could assign it to me, I'd be appreciative. I'll likely need some assistance at a few

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-25 Thread David Morávek
AFAIK there are currently no other sources in Flink that can treat "other sources" / "destination" as data. Most complete generic work on this topic that I'm aware of are Splittable DoFn based IOs in Apache Beam. I think the best module for the contribution would be "elasticsearch-base", because

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-25 Thread Rion Williams
Hi David, That was perfect and it looks like this is working as I'd expected. I put together some larger integration tests for my specific use-case (multiple Elasticsearch clusters running in TestContainers) and verified that messages were being routed dynamically to the appropriate sinks. I

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-24 Thread David Morávek
Hi Rion, you just need to call *sink.setRuntimeContext(getRuntimeContext())* before opening the child sink. Please see *AbstractRichFunction* [1] (that EleasticsearchSink extends) for more details. One more note, instead of starting with integration test, I'd recommend writing a unit test using

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-23 Thread Rion Williams
Hi David, Thanks again for the response, I believe that I'm getting pretty close for at least a POC-level implementation of this. Currently, I'm working with JsonObject instances throughout the pipeline, so I wanted to try this out and simply stored the routing information within the element

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-23 Thread David Morávek
Hi Rion, Sorry for late reply, I've missed your previous message. Thanks Arvid for the reminder <3. something like a MessageWrapper and pass those > elements to the sink, which would create the tenant-specific Elastic > connection from the ConfigurationT element and handle caching it and then >

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-16 Thread Rion Williams
Thanks for this suggestion David, it's extremely helpful. Since this will vary depending on the elements retrieved from a separate stream, I'm guessing something like the following would be roughly the avenue to continue down: fun main(args: Array) { val parameters =

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-13 Thread David Morávek
To give you a better idea, in high-level I think could look something like this [1]. [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8 On Fri, Aug 13, 2021 at 2:57 PM Rion Williams wrote: > Hi David, > > Thanks for

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-13 Thread Rion Williams
Hi David, Thanks for your response! I think there are currently quite a few unknowns in my end in terms of what a production loads look like but I think the number of clusters shouldn’t be too large (and will either rarely change or have new entries come in at runtime, but it needs to support

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-13 Thread David Morávek
Hi Rion, As you probably already know, for dynamic indices, you can simply implement your own ElasticsearchSinkFunction

Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-08 Thread Rion Williams
Hi folks, I have a use-case that I wanted to initially pose to the mailing list as I’m not terribly familiar with the Elasticsearch connector to ensure I’m not going down the wrong path trying to accomplish this in Flink (or if something downstream might be a better option). Basically, I have