Hello,
I try to use the camel elasticsearch-component to push a list of objects to an
elastic search with bulk operation (camel with spring boot in 4.0.3 version)
After reading the documentation, and search samples, i can't use the
elasticsearch bulk with camel.
Does anyone try the bulk operation with success?
Details :
I push a list of elastic index operations (BulkOperation) without success (code
at the end) without result in my elastic search (elastic stay empty)
I try to send BulkRequest.Builder instead and list of operation to the
elasticsearch-component.
The operations are well in debuger.
The elastic search is well configured (i can push blank document to him).
I have an error :
2024-02-13T15:31:29,258 [Camel (camel-1) thread #8 - Aggregator] INFO
DefaultErrorHandler - Failed delivery for (MessageId:
02649700127A6F9-00000000000004CA on ExchangeId:
02649700127A6F9-00000000000004CA). On delivery attempt: 0 caught:
org.apache.camel.CamelExchangeException: An error occurred while executing the
action. Exchange[02649700127A6F9-00000000000004CA]. Caused by:
[java.util.concurrent.CompletionException - jakarta.json.JsonException: Jackson
exception]
In can transform object my class in json with jackson in the rest of the
project (it require module jackson-datatype-jdk8, andjackson-datatype-jsr310
that are in the classpath)
Code that generates the index operation list
==
public void createElasticsearchBulkExchange(Exchange aggregatedExchange) {
List<Exchange> exchangeOperations =
aggregatedExchange.getMessage().getBody(List.class);
List<BulkOperation> operationsList = new ArrayList<>(200);
for (Exchange exchange : exchangeOperations) { // exchange
operation contains à list of objects to push to elastic
BulkOperation indexOperation = (new
BulkOperation.Builder()).index(operation ->
operation.id(getEntityId(exchange))
.version(getVersion(exchange))
.versionType(VersionType.ExternalGte)
.index(elasticPjConfiguration.getIngestAlias())
.document(exchange.getMessage().getBody(MyObj.class))).build(); //MyObj is th
operationsList.add(indexOperation);
}
aggregatedExchange.getMessage().setHeader(ElasticsearchConstants.PARAM_INDEX_NAME,
elasticPjConfiguration.getIngestAlias());
aggregatedExchange.getMessage().setHeader(ElasticsearchConstants.PARAM_DOCUMENT_MODE,
true);
aggregatedExchange.getMessage().setHeader(ElasticsearchConstants.PARAM_OPERATION,
ElasticsearchOperation.Bulk);
aggregatedExchange.getMessage().setHeader(ElasticsearchConstants.PARAM_DOCUMENT_CLASS,
MyObj.class);
aggregatedExchange.getMessage().setHeader(ElasticsearchConstants.PARAM_SIZE,
operationsList.size());
aggregatedExchange.getMessage().setBody(operationsList);
}
Part of the camel road (the elastic search index seams configured well, i have
add empty document in the elastic)
==
.process(elasticsearchIndexUtilsService::createElasticsearchBulkExchange)
.to(elasticsearch("elasticsearch")
.operation(ElasticsearchOperation.Bulk)
.indexName(elasticIndexPbConfigurationManager.getIngestAliasName()))
[bandeau_pagesjaunes3]<https://www.youtube.com/playlist?list=PLlp-0-iXtsZMOsEKlmySbkrj7VNjqUOfH>
[bandeau_partenaires]<https://www.solocal.com>
Les informations contenues dans le présent message sont strictement
confidentielles et ne sont destinées qu'à l'usage de la ou des personne(s) dont
le nom apparaît en qualité de destinataire(s) et de tout autre personne
spécifiquement autorisée à les recevoir. Si vous n'êtes pas la personne à qui
ce message est destiné, nous vous informons qu'il est strictement interdit de
le lire, diffuser, de le distribuer ou d'en faire des copies, totalement ou
partiellement, sur tout support, notamment un support électronique, ou autre.
La présente interdiction s'applique tant au message lui-même qu'aux documents
qui peuvent être joints audit message. Si vous recevez ce message par erreur,
nous vous remercions de bien vouloir le détruire ainsi que toute copie et de
signaler l'erreur à l'envoyeur par retour d'e-mail.