Hi David,

Thanks for your response! I think there are currently quite a few unknowns in 
my end in terms of what a production loads look like but I think the number of 
clusters shouldn’t be too large (and will either rarely change or have new 
entries come in at runtime, but it needs to support that).

I think the dynamic approach might be a good route to explore with actual 
changes to the Elasticsearch sink as a longer term option. I’m not sure what 
the dynamic one would look like at the moment though, perhaps that’s something 
you’d be able to advise on?

Given that all the records are keyed for a given tenant and I would have the 
mappings stored in state, is it possible that within the open() function for 
this dynamic route to access the state and initialize the client there? Or 
maybe there’s some other approach (such as grouping by clusters and dynamically 
handling indices)?

I’d be happy to give a shot at making the appropriate changes to the sink as 
well, although I’m far from an Elastic expert. If you point me in the right 
direction, I may be able to help out.

Thanks much!

Rion

> On Aug 13, 2021, at 6:52 AM, David Morávek <d...@apache.org> wrote:
> 
> 
> Hi Rion,
> 
> As you probably already know, for dynamic indices, you can simply implement 
> your own ElasticsearchSinkFunction [1], where you can create any request that 
> elastic client supports.
> 
> The tricky part is how to implement dynamic routing into multiple clusters. 
> - If the elastic clusters are known upfront (before submitting job), you can 
> easily create multiple elastic sinks and prepend them with a simple filter 
> (this is basically what split operator does).
> - If you discover elastics clusters at runtime, this would require some 
> changes of the current ElasticsearchSink implementation. I think this may be 
> actually as simple as introducing something like DynamicElasticsearchSink, 
> that could dynamically create and managed "child" sinks. This would probably 
> require some thoughts about how to manage consumed resources (memory), 
> because number of child sink could be potentially unbounded. This could be of 
> course simplified if underlying elastic client already supports that, which 
> I'm not aware of. If you'd like to take this path, it would definitely be a 
> great contribution to Flink (I'm able to provide some guidance). 
> 
> [1] 
> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
> 
> Best,
> D.
> 
>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <rionmons...@gmail.com> wrote:
>> Hi folks,
>> 
>> I have a use-case that I wanted to initially pose to the mailing list as I’m 
>> not terribly familiar with the Elasticsearch connector to ensure I’m not 
>> going down the wrong path trying to accomplish this in Flink (or if 
>> something downstream might be a better option).
>> 
>> Basically, I have the following pieces to the puzzle:
>> A stream of tenant-specific events
>> An HTTP endpoint containing mappings for tenant-specific Elastic cluster 
>> information (as each tenant has its own specific Elastic cluster/index)
>> What I’m hoping to accomplish is the following:
>> One stream will periodically poll the HTTP endpoint and store these cluster 
>> mappings in state (keyed by tenant with cluster info as the value)
>> The event stream will be keyed by tenant and connected to the cluster 
>> mappings stream.
>> I’ll need to an Elasticsearch sink that can route the tenant-specific event 
>> data to its corresponding cluster/index from the mapping source.
>> I know that the existing Elasticsearch sink supports dynamic indices, 
>> however I didn’t know if it’s possible to adjust the cluster like I would 
>> need on a per-tenant basis or if there’s a better approach here? 
>> 
>> Any advice would be appreciated.
>> 
>> Thanks,
>> 
>> Rion

Reply via email to