[ 
https://issues.apache.org/jira/browse/FLINK-25756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17480243#comment-17480243
 ] 

Andriy Redko commented on FLINK-25756:
--------------------------------------

[~MartijnVisser] awesome, we are working on the pull request right now 
(targeting master but looking forward to retarget to the new connectors home), 
early next week we should have something tangible to setup and test, thank you 
very much for offering help, much appreciate it.

> Dedicated Opensearch connectors
> -------------------------------
>
>                 Key: FLINK-25756
>                 URL: https://issues.apache.org/jira/browse/FLINK-25756
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Andriy Redko
>            Priority: Major
>
> Since the time Opensearch got forked from Elasticsearch a few things got 
> changed. The projects evolve in different directions, the Elasticsearch 
> clients up to 7.13.x were able to connect to Opensearch clusters, but since 
> 7.14 - not anymore [1] (Elastic continues to harden their clients to connect 
> to Elasticsearch clusters only).
> For example, running current Flink master against Opensearch clusters using 
> Elalsticsearch 7 connectors would fail with:
>  
> {noformat}
>  Caused by: ElasticsearchException[Elasticsearch version 6 or more is 
> required]
>  at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient$4.onResponse(RestHighLevelClient.java:2056)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient$4.onResponse(RestHighLevelClient.java:2043)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.ListenableFuture.notifyListenerDirectly(ListenableFuture.java:113)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.ListenableFuture.done(ListenableFuture.java:100)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.BaseFuture.set(BaseFuture.java:133)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.ListenableFuture.onResponse(ListenableFuture.java:139)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient$5.onSuccess(RestHighLevelClient.java:2129)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:636)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:376)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:370)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:181)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591){noformat}
>  
> With the compatibility mode [2] turned on, still fails further the line:
> {noformat}
> Caused by: ElasticsearchException[Invalid or missing tagline [The OpenSearch 
> Project: https://opensearch.org/]]
>  at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient$4.onResponse(RestHighLevelClient.java:2056)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient$4.onResponse(RestHighLevelClient.java:2043)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.ListenableFuture.notifyListenerDirectly(ListenableFuture.java:113)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.ListenableFuture.done(ListenableFuture.java:100)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.BaseFuture.set(BaseFuture.java:133)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.ListenableFuture.onResponse(ListenableFuture.java:139)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient$5.onSuccess(RestHighLevelClient.java:2129)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:636)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:376)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:370)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
>  at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted
>  {noformat}
>  
> Arguably, the best way to proceed in this situation is to provide dedicated 
> Opensearch connectors, on par with Elasticsearch ones.
> Me ([~reta]) and [~Sergey Nuyanzin] would be more than happy to make the 
> contribution (and maintain it as well), if there are no objections from 
> committers / PMC members.
> [1] [https://github.com/elastic/elasticsearch/pull/73910]
> [2] 
> [https://opensearch.org/docs/latest/upgrade-to/upgrade-to/#upgrade-elasticsearch-oss]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to