[ 
https://issues.apache.org/jira/browse/FLINK-25211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-25211:
-----------------------------------
      Labels: auto-deprioritized-major auto-deprioritized-minor question  (was: 
auto-deprioritized-major question stale-minor)
    Priority: Not a Priority  (was: Minor)

This issue was labeled "stale-minor" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Minor, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> Elasticsearch connector version compatibility
> ---------------------------------------------
>
>                 Key: FLINK-25211
>                 URL: https://issues.apache.org/jira/browse/FLINK-25211
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Adam Roberts
>            Priority: Not a Priority
>              Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> question
>
> Hi there, let's say I'm writing a Flink job that wants to insert data into 
> Elastic and I'm importing the Elastic rest client for Flink in my job.
>  
> Specifically I have this at the moment, but my deployed Elasticsearch version 
> is 7.15.1 (more on this shortly):
>  
> {{    <dependency>}}
> {{        <groupId>org.elasticsearch.client</groupId>}}
> {{        <artifactId>elasticsearch-rest-high-level-client</artifactId>}}
> {{        <version>7.10.1</version>}}
> {{      </dependency>}}
>  
> I know that works just fine when I'm deployed Elasticsearch 7.8.x, but when I 
> switch to using Elasticsearch 7.15.1, I've noticed intermittent issues with 
> authorisation (401s come back and headers are not added to subsequent 
> requests, and so we have to implement retries in our Flink configuration 
> and/or the Flink job itself). 
>  
> While there may be an issue with the particular code I'm writing (we have our 
> own security plugin), I do then have questions around which versions of 
> Elasticsearch our connector in Flink is known to work with.
>  
> Let's say I upgrade the Flink rest client version to be 7.15.1 - to match my 
> Elasticsearch version, which might seem sensible.
>  
> If I do that, my JobManagers immediately crash because TimeValue has moved in 
> the Elasticsearch code: see here 
> [https://github.com/elastic/elasticsearch/commit/68817d7ca29c264b3ea3f766737d81e2ebb4028c#diff-d8695a17187facb254e5ba4b900c7d5a555e5ae5377f38704209e5e064ea630e]
>  
> Specifically we see this at startup:
>  
> {{{}java.lang.NoClassDefFoundError: 
> org.elasticsearch.common.unit.TimeValue{}}}{{{}at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.configureFlushInterval(ElasticsearchSinkBase.java:420)
>  ~[?:?]{}}}{{{}at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.buildBulkProcessor(ElasticsearchSinkBase.java:393)
>  ~[?:?]{}}}{{{}at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:319)
>  ~[?:?]{}}}{{{}at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at 
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at <unknown class>.call(Unknown 
> Source) ~[?:?]{}}}{{{}at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at <unknown class>.run(Unknown 
> Source) ~[?:?]{}}}{{{}at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at java.lang.Thread.run(Unknown 
> Source) ~[?:?]{}}}{{{}Caused by: java.lang.ClassNotFoundException: 
> org.elasticsearch.common.unit.TimeValue{}}}{{{}at 
> java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]{}}}{{{}at 
> java.lang.ClassLoader.loadClassHelper(Unknown Source) ~[?:?]{}}}{{{}at 
> java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] (omitted the rest for 
> brevity - although this is still quite long){}}}
>  
> and so I think - fine, what does the documentation say? 
>  
> If we look at find 
> [https://github.com/apache/flink/blob/master/docs/content/docs/connectors/datastream/elasticsearch.md]
>  - this states 7.5.1 of Elasticsearch. 
>  
> And then if we look here, we see that the old package for TimeValue is 
> present in the current connector code for Flink 
> [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java#L40.]
>  
> And this leads me to this JIRA post: is the current documentation accurate in 
> that we can only use up to Elasticsearch 7.5.1, reliably, with the connector 
> code in Flink? 
>  
> What is the "lay of the land" insofar as the Flink community and Elastic; is 
> this an area that's actively looked into or is the general consensus that one 
> should write their own connector of sorts? 
>  
> Are there plans to increase said version? Does it have to be this for a 
> particular reason?
>  
> Many thanks in advance, I can say that we've been using the 7.8 client just 
> fine but the move to test with Elasticsearch 7.15 has come with a few 
> challenges and so I'm hoping the Flink community can possibly some shed some 
> light in this direction.
>  
> Cheers,



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to