This is an automated email from the ASF dual-hosted git repository. nfilotto pushed a commit to branch CAMEL-18580/elastic-async-producer in repository https://gitbox.apache.org/repos/asf/camel.git
commit fd3133751204600b3c9fd83201c57d5a317903c2 Author: Nicolas Filotto <nfilo...@talend.com> AuthorDate: Mon Oct 3 18:54:35 2022 +0200 CAMEL-18580: camel-elasticsearch - Propose an async producer --- .../camel/component/es/ElasticsearchProducer.java | 543 ++++++++++++++------- 1 file changed, 380 insertions(+), 163 deletions(-) diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/es/ElasticsearchProducer.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/es/ElasticsearchProducer.java index 51f62962a2c..5cafd7933c5 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/es/ElasticsearchProducer.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/es/ElasticsearchProducer.java @@ -22,29 +22,42 @@ import java.nio.file.Paths; import java.security.KeyStore; import java.security.cert.Certificate; import java.security.cert.CertificateFactory; +import java.util.concurrent.CompletableFuture; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; +import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient; import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch._types.WriteResponseBase; import co.elastic.clients.elasticsearch.core.BulkRequest; +import co.elastic.clients.elasticsearch.core.BulkResponse; import co.elastic.clients.elasticsearch.core.DeleteRequest; +import co.elastic.clients.elasticsearch.core.DeleteResponse; import co.elastic.clients.elasticsearch.core.GetRequest; import co.elastic.clients.elasticsearch.core.IndexRequest; import co.elastic.clients.elasticsearch.core.MgetRequest; +import co.elastic.clients.elasticsearch.core.MgetResponse; import co.elastic.clients.elasticsearch.core.MsearchRequest; +import co.elastic.clients.elasticsearch.core.MsearchResponse; import co.elastic.clients.elasticsearch.core.SearchRequest; +import co.elastic.clients.elasticsearch.core.SearchResponse; import co.elastic.clients.elasticsearch.core.UpdateRequest; +import co.elastic.clients.elasticsearch.core.UpdateResponse; import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest; +import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse; import co.elastic.clients.elasticsearch.indices.ExistsRequest; import co.elastic.clients.json.jackson.JacksonJsonpMapper; import co.elastic.clients.transport.ElasticsearchTransport; +import co.elastic.clients.transport.endpoints.BooleanResponse; import co.elastic.clients.transport.rest_client.RestClientTransport; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; +import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.Message; -import org.apache.camel.support.DefaultProducer; +import org.apache.camel.support.DefaultAsyncProducer; import org.apache.camel.util.IOHelper; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; @@ -64,13 +77,14 @@ import static org.apache.camel.component.es.ElasticsearchConstants.PARAM_SCROLL_ /** * Represents an Elasticsearch producer. */ -public class ElasticsearchProducer extends DefaultProducer { +class ElasticsearchProducer extends DefaultAsyncProducer { private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchProducer.class); protected final ElasticsearchConfiguration configuration; + private final Object mutex = new Object(); private volatile RestClient client; - private volatile Sniffer sniffer; + private Sniffer sniffer; public ElasticsearchProducer(ElasticsearchEndpoint endpoint, ElasticsearchConfiguration configuration) { super(endpoint); @@ -116,188 +130,338 @@ public class ElasticsearchProducer extends DefaultProducer { } if (operationConfig == null) { throw new IllegalArgumentException( - ElasticsearchConstants.PARAM_OPERATION + " value '" + operationConfig + "' is not supported"); + ElasticsearchConstants.PARAM_OPERATION + " value is mandatory"); } return operationConfig; } @Override - public void process(Exchange exchange) throws Exception { - if (configuration.isDisconnect() && client == null) { - startClient(); - } - final ObjectMapper mapper = new ObjectMapper(); - mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); - ElasticsearchTransport transport = new RestClientTransport(client, new JacksonJsonpMapper(mapper)); - ElasticsearchClient esClient = new ElasticsearchClient(transport); - // 2. Index and type will be set by: - // a. If the incoming body is already an action request - // b. If the body is not an action request we will use headers if they - // are set. - // c. If the body is not an action request and the headers aren't set we - // will use the configuration. - // No error is thrown by the component in the event none of the above - // conditions are met. The java es client - // will throw. - - Message message = exchange.getIn(); - final ElasticsearchOperation operation = resolveOperation(exchange); - - // Set the index/type headers on the exchange if necessary. This is used - // for type conversion. - boolean configIndexName = false; - String indexName = message.getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class); - if (indexName == null) { - message.setHeader(ElasticsearchConstants.PARAM_INDEX_NAME, configuration.getIndexName()); - configIndexName = true; - } + public boolean process(Exchange exchange, AsyncCallback callback) { + try { + if (configuration.isDisconnect() && client == null) { + startClient(); + } + final ObjectMapper mapper = new ObjectMapper(); + mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); + ElasticsearchTransport transport = new RestClientTransport(client, new JacksonJsonpMapper(mapper)); + // 2. Index and type will be set by: + // a. If the incoming body is already an action request + // b. If the body is not an action request we will use headers if they + // are set. + // c. If the body is not an action request and the headers aren't set we + // will use the configuration. + // No error is thrown by the component in the event none of the above + // conditions are met. The java es client + // will throw. - Integer size = message.getHeader(ElasticsearchConstants.PARAM_SIZE, Integer.class); - if (size == null) { - message.setHeader(ElasticsearchConstants.PARAM_SIZE, configuration.getSize()); - } + Message message = exchange.getIn(); + final ElasticsearchOperation operation = resolveOperation(exchange); - Integer from = message.getHeader(ElasticsearchConstants.PARAM_FROM, Integer.class); - if (from == null) { - message.setHeader(ElasticsearchConstants.PARAM_FROM, configuration.getFrom()); - } + // Set the index/type headers on the exchange if necessary. This is used + // for type conversion. + boolean configIndexName = false; + String indexName = message.getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class); + if (indexName == null) { + message.setHeader(ElasticsearchConstants.PARAM_INDEX_NAME, configuration.getIndexName()); + configIndexName = true; + } - boolean configWaitForActiveShards = false; - Integer waitForActiveShards = message.getHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, Integer.class); - if (waitForActiveShards == null) { - message.setHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, configuration.getWaitForActiveShards()); - configWaitForActiveShards = true; - } + Integer size = message.getHeader(ElasticsearchConstants.PARAM_SIZE, Integer.class); + if (size == null) { + message.setHeader(ElasticsearchConstants.PARAM_SIZE, configuration.getSize()); + } - Class<?> documentClass = message.getHeader(ElasticsearchConstants.PARAM_DOCUMENT_CLASS, Class.class); - if (documentClass == null) { - documentClass = configuration.getDocumentClass(); - } + Integer from = message.getHeader(ElasticsearchConstants.PARAM_FROM, Integer.class); + if (from == null) { + message.setHeader(ElasticsearchConstants.PARAM_FROM, configuration.getFrom()); + } - switch (operation) { - case Index: { - IndexRequest.Builder<?> indexRequestBuilder = message.getBody(IndexRequest.Builder.class); - message.setBody(esClient.index(indexRequestBuilder.build()).id()); - break; + boolean configWaitForActiveShards = false; + Integer waitForActiveShards = message.getHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, Integer.class); + if (waitForActiveShards == null) { + message.setHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, configuration.getWaitForActiveShards()); + configWaitForActiveShards = true; } - case Update: { - UpdateRequest.Builder updateRequestBuilder = message.getBody(UpdateRequest.Builder.class); - message.setBody(esClient.update(updateRequestBuilder.build(), documentClass).id()); - break; + + Class<?> documentClass = message.getHeader(ElasticsearchConstants.PARAM_DOCUMENT_CLASS, Class.class); + if (documentClass == null) { + documentClass = configuration.getDocumentClass(); } - case GetById: { - GetRequest.Builder getRequestBuilder = message.getBody(GetRequest.Builder.class); - if (getRequestBuilder == null) { - throw new IllegalArgumentException( - "Wrong body type. Only String or GetRequest.Builder is allowed as a type"); + + ActionContext ctx = new ActionContext(exchange, callback, transport, configIndexName, configWaitForActiveShards); + + switch (operation) { + case Index: { + processIndexAsync(ctx); + break; } - message.setBody(esClient.get(getRequestBuilder.build(), documentClass)); - break; - } - case Bulk: { - BulkRequest.Builder bulkRequestBuilder = message.getBody(BulkRequest.Builder.class); - if (bulkRequestBuilder == null) { - throw new IllegalArgumentException( - "Wrong body type. Only Iterable or BulkRequest.Builder is allowed as a type"); + case Update: { + processUpdateAsync(ctx, documentClass); + break; } - message.setBody(esClient.bulk(bulkRequestBuilder.build()).items()); - break; - } - case Delete: { - DeleteRequest.Builder deleteRequestBuilder = message.getBody(DeleteRequest.Builder.class); - if (deleteRequestBuilder == null) { - throw new IllegalArgumentException( - "Wrong body type. Only String or DeleteRequest.Builder is allowed as a type"); + case GetById: { + processGetByIdAsync(ctx, documentClass); + break; } - message.setBody(esClient.delete(deleteRequestBuilder.build()).result()); - break; - } - case DeleteIndex: { - DeleteIndexRequest.Builder deleteIndexRequestBuilder = message.getBody(DeleteIndexRequest.Builder.class); - if (deleteIndexRequestBuilder == null) { - throw new IllegalArgumentException( - "Wrong body type. Only String or DeleteIndexRequest.Builder is allowed as a type"); + case Bulk: { + processBulkAsync(ctx); + break; } - message.setBody(esClient.indices().delete(deleteIndexRequestBuilder.build()).acknowledged()); - break; - } - case Exists: { - ExistsRequest.Builder builder = new ExistsRequest.Builder(); - builder.index(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class)); - message.setBody(esClient.indices().exists(builder.build()).value()); - break; - } - case Search: { - SearchRequest.Builder searchRequestBuilder = message.getBody(SearchRequest.Builder.class); - if (searchRequestBuilder == null) { - throw new IllegalArgumentException( - "Wrong body type. Only Map, String or SearchRequest.Builder is allowed as a type"); + case Delete: { + processDeleteAsync(ctx); + break; } - // is it a scroll request ? - boolean useScroll = message.getHeader(PARAM_SCROLL, configuration.isUseScroll(), Boolean.class); - if (useScroll) { - int scrollKeepAliveMs - = message.getHeader(PARAM_SCROLL_KEEP_ALIVE_MS, configuration.getScrollKeepAliveMs(), - Integer.class); - ElasticsearchScrollRequestIterator<?> scrollRequestIterator = new ElasticsearchScrollRequestIterator<>( - searchRequestBuilder, esClient, scrollKeepAliveMs, exchange, documentClass); - exchange.getIn().setBody(scrollRequestIterator); - } else { - message.setBody(esClient.search(searchRequestBuilder.build(), documentClass).hits()); + case DeleteIndex: { + processDeleteIndexAsync(ctx); + break; } - break; - } - case MultiSearch: { - MsearchRequest.Builder msearchRequestBuilder = message.getBody(MsearchRequest.Builder.class); - if (msearchRequestBuilder == null) { - throw new IllegalArgumentException("Wrong body type. Only MsearchRequest.Builder is allowed as a type"); + case Exists: { + processExistsAsync(ctx); + break; } - message.setBody(esClient.msearch(msearchRequestBuilder.build(), documentClass).responses()); - break; - } - case MultiGet: { - MgetRequest.Builder mgetRequestBuilder = message.getBody(MgetRequest.Builder.class); - if (mgetRequestBuilder == null) { - throw new IllegalArgumentException("Wrong body type. Only MgetRequest.Builder is allowed as a type"); + case Search: { + SearchRequest.Builder searchRequestBuilder = message.getBody(SearchRequest.Builder.class); + if (searchRequestBuilder == null) { + throw new IllegalArgumentException( + "Wrong body type. Only Map, String or SearchRequest.Builder is allowed as a type"); + } + // is it a scroll request ? + boolean useScroll = message.getHeader(PARAM_SCROLL, configuration.isUseScroll(), Boolean.class); + if (useScroll) { + // As a scroll request is expected, for the sake of simplicity, the synchronous mode is preserved + int scrollKeepAliveMs + = message.getHeader(PARAM_SCROLL_KEEP_ALIVE_MS, configuration.getScrollKeepAliveMs(), + Integer.class); + ElasticsearchScrollRequestIterator<?> scrollRequestIterator = new ElasticsearchScrollRequestIterator<>( + searchRequestBuilder, new ElasticsearchClient(transport), scrollKeepAliveMs, exchange, + documentClass); + exchange.getIn().setBody(scrollRequestIterator); + cleanup(ctx); + callback.done(true); + return true; + } else { + onComplete( + new ElasticsearchAsyncClient(transport).search(searchRequestBuilder.build(), documentClass) + .thenApply(SearchResponse::hits), + ctx); + } + break; + } + case MultiSearch: { + processMultiSearchAsync(ctx, documentClass); + break; + } + case MultiGet: { + processMultiGetAsync(ctx, documentClass); + break; + } + case Ping: { + processPingAsync(ctx); + break; + } + default: { + throw new IllegalArgumentException( + ElasticsearchConstants.PARAM_OPERATION + " value '" + operation + "' is not supported"); } - message.setBody(esClient.mget(mgetRequestBuilder.build(), documentClass).docs()); - break; - } - case Ping: { - message.setBody(esClient.ping().value()); - break; - } - default: { - throw new IllegalArgumentException( - ElasticsearchConstants.PARAM_OPERATION + " value '" + operation + "' is not supported"); } + } catch (Exception e) { + exchange.setException(e); + callback.done(true); + return true; } + return false; + } + + /** + * Executes asynchronously a ping to the Elastic cluster. + */ + private void processPingAsync(ActionContext ctx) { + onComplete( + ctx.getClient().ping() + .thenApply(BooleanResponse::value), + ctx); + } + + /** + * Executes asynchronously a multi-get request. + */ + private void processMultiGetAsync(ActionContext ctx, Class<?> documentClass) { + MgetRequest.Builder mgetRequestBuilder = ctx.getMessage().getBody(MgetRequest.Builder.class); + if (mgetRequestBuilder == null) { + throw new IllegalArgumentException("Wrong body type. Only MgetRequest.Builder is allowed as a type"); + } + onComplete( + ctx.getClient().mget(mgetRequestBuilder.build(), documentClass) + .thenApply(MgetResponse::docs), + ctx); + } - // If we set params via the configuration on this exchange, remove them - // now. This preserves legacy behavior for this component and enables a - // use case where one message can be sent to multiple elasticsearch - // endpoints where the user is relying on the endpoint configuration - // (index/type) rather than header values. If we do not clear this out - // sending the same message (index request, for example) to multiple - // elasticsearch endpoints would have the effect overriding any - // subsequent endpoint index/type with the first endpoint index/type. - if (configIndexName) { - message.removeHeader(ElasticsearchConstants.PARAM_INDEX_NAME); + /** + * Executes asynchronously a multi-search request. + */ + private void processMultiSearchAsync(ActionContext ctx, Class<?> documentClass) { + MsearchRequest.Builder msearchRequestBuilder = ctx.getMessage().getBody(MsearchRequest.Builder.class); + if (msearchRequestBuilder == null) { + throw new IllegalArgumentException("Wrong body type. Only MsearchRequest.Builder is allowed as a type"); } + onComplete( + ctx.getClient().msearch(msearchRequestBuilder.build(), documentClass) + .thenApply(MsearchResponse::responses), + ctx); + } + + /** + * Checks asynchronously if a given index exists. + */ + private void processExistsAsync(ActionContext ctx) { + ExistsRequest.Builder builder = new ExistsRequest.Builder(); + builder.index(ctx.getMessage().getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class)); + onComplete( + ctx.getClient().indices().exists(builder.build()) + .thenApply(BooleanResponse::value), + ctx); + } - if (configWaitForActiveShards) { - message.removeHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS); + /** + * Deletes asynchronously an index. + */ + private void processDeleteIndexAsync(ActionContext ctx) { + DeleteIndexRequest.Builder deleteIndexRequestBuilder = ctx.getMessage().getBody(DeleteIndexRequest.Builder.class); + if (deleteIndexRequestBuilder == null) { + throw new IllegalArgumentException( + "Wrong body type. Only String or DeleteIndexRequest.Builder is allowed as a type"); } - if (configuration.isDisconnect()) { - IOHelper.close(transport); - IOHelper.close(client); - client = null; - if (configuration.isEnableSniffer()) { - IOHelper.close(sniffer); - sniffer = null; - } + onComplete( + ctx.getClient().indices().delete(deleteIndexRequestBuilder.build()) + .thenApply(DeleteIndexResponse::acknowledged), + ctx); + } + + /** + * Deletes asynchronously a document. + */ + private void processDeleteAsync(ActionContext ctx) { + DeleteRequest.Builder deleteRequestBuilder = ctx.getMessage().getBody(DeleteRequest.Builder.class); + if (deleteRequestBuilder == null) { + throw new IllegalArgumentException( + "Wrong body type. Only String or DeleteRequest.Builder is allowed as a type"); + } + onComplete( + ctx.getClient().delete(deleteRequestBuilder.build()) + .thenApply(DeleteResponse::result), + ctx); + } + + /** + * Executes asynchronously bulk operations. + */ + private void processBulkAsync(ActionContext ctx) { + BulkRequest.Builder bulkRequestBuilder = ctx.getMessage().getBody(BulkRequest.Builder.class); + if (bulkRequestBuilder == null) { + throw new IllegalArgumentException( + "Wrong body type. Only Iterable or BulkRequest.Builder is allowed as a type"); } + onComplete( + ctx.getClient().bulk(bulkRequestBuilder.build()) + .thenApply(BulkResponse::items), + ctx); + } + + /** + * Finds asynchronously a document by id. + */ + private void processGetByIdAsync(ActionContext ctx, Class<?> documentClass) { + GetRequest.Builder getRequestBuilder = ctx.getMessage().getBody(GetRequest.Builder.class); + if (getRequestBuilder == null) { + throw new IllegalArgumentException( + "Wrong body type. Only String or GetRequest.Builder is allowed as a type"); + } + onComplete( + ctx.getClient().get(getRequestBuilder.build(), documentClass), + ctx); + } + + /** + * Updates asynchronously a document. + */ + private void processUpdateAsync(ActionContext ctx, Class<?> documentClass) { + UpdateRequest.Builder updateRequestBuilder = ctx.getMessage().getBody(UpdateRequest.Builder.class); + onComplete( + ctx.getClient().update(updateRequestBuilder.build(), documentClass) + .thenApply(r -> ((UpdateResponse<?>) r).id()), + ctx); + } + + /** + * Indexes asynchronously a document. + */ + private void processIndexAsync(ActionContext ctx) { + IndexRequest.Builder<?> indexRequestBuilder = ctx.getMessage().getBody(IndexRequest.Builder.class); + onComplete( + ctx.getClient().index(indexRequestBuilder.build()) + .thenApply(WriteResponseBase::id), + ctx); + } + + /** + * Add actions to perform once the given future is complete. + * + * @param future the future to complete with specific actions. + * @param ctx the context of the asynchronous task. + * @param <T> the result type returned by the future. + */ + private <T> void onComplete(CompletableFuture<T> future, ActionContext ctx) { + final Exchange exchange = ctx.getExchange(); + future.thenAccept(r -> exchange.getIn().setBody(r)) + .thenAccept(r -> cleanup(ctx)) + .whenComplete( + (r, e) -> { + try { + if (e != null) { + exchange.setException(new CamelExchangeException( + "An error occurred while executing the action", exchange, e)); + } + } finally { + ctx.getCallback().done(false); + } + }); + } + + /** + * The cleanup task to execute once everything is done. + */ + private void cleanup(ActionContext ctx) { + + try { + Message message = ctx.getMessage(); + + // If we set params via the configuration on this exchange, remove them + // now. This preserves legacy behavior for this component and enables a + // use case where one message can be sent to multiple elasticsearch + // endpoints where the user is relying on the endpoint configuration + // (index/type) rather than header values. If we do not clear this out + // sending the same message (index request, for example) to multiple + // elasticsearch endpoints would have the effect overriding any + // subsequent endpoint index/type with the first endpoint index/type. + if (ctx.isConfigIndexName()) { + message.removeHeader(ElasticsearchConstants.PARAM_INDEX_NAME); + } + if (ctx.isConfigWaitForActiveShards()) { + message.removeHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS); + } + if (configuration.isDisconnect()) { + IOHelper.close(ctx.getTransport()); + if (configuration.isEnableSniffer()) { + IOHelper.close(sniffer); + sniffer = null; + } + IOHelper.close(client); + client = null; + } + } catch (Exception e) { + LOG.warn("Could not execute the cleanup task", e); + } } @Override @@ -310,12 +474,16 @@ public class ElasticsearchProducer extends DefaultProducer { private void startClient() { if (client == null) { - LOG.info("Connecting to the ElasticSearch cluster: {}", configuration.getClusterName()); - if (configuration.getHostAddressesList() != null - && !configuration.getHostAddressesList().isEmpty()) { - client = createClient(); - } else { - LOG.warn("Incorrect ip address and port parameters settings for ElasticSearch cluster"); + synchronized (mutex) { + if (client == null) { + LOG.info("Connecting to the ElasticSearch cluster: {}", configuration.getClusterName()); + if (configuration.getHostAddressesList() != null + && !configuration.getHostAddressesList().isEmpty()) { + client = createClient(); + } else { + LOG.warn("Incorrect ip address and port parameters settings for ElasticSearch cluster"); + } + } } } } @@ -388,4 +556,53 @@ public class ElasticsearchProducer extends DefaultProducer { throw new RuntimeException(e); } } + + /** + * An inner class providing all the information that an asynchronous action could need. + */ + private static class ActionContext { + + private final Exchange exchange; + private final AsyncCallback callback; + private final ElasticsearchTransport transport; + private final boolean configIndexName; + private final boolean configWaitForActiveShards; + + ActionContext(Exchange exchange, AsyncCallback callback, ElasticsearchTransport transport, boolean configIndexName, + boolean configWaitForActiveShards) { + this.exchange = exchange; + this.callback = callback; + this.transport = transport; + this.configIndexName = configIndexName; + this.configWaitForActiveShards = configWaitForActiveShards; + } + + ElasticsearchTransport getTransport() { + return transport; + } + + ElasticsearchAsyncClient getClient() { + return new ElasticsearchAsyncClient(transport); + } + + boolean isConfigIndexName() { + return configIndexName; + } + + boolean isConfigWaitForActiveShards() { + return configWaitForActiveShards; + } + + Exchange getExchange() { + return exchange; + } + + AsyncCallback getCallback() { + return callback; + } + + Message getMessage() { + return exchange.getIn(); + } + } }