This is an automated email from the ASF dual-hosted git repository. ilgrosso pushed a commit to branch 3_0_X in repository https://gitbox.apache.org/repos/asf/syncope.git
commit 1342843e30f8a21e2246986da427db28e1835a2c Author: Francesco Chicchiriccò <[email protected]> AuthorDate: Wed Dec 13 08:34:14 2023 +0100 Adjusting flaky test --- .../java/job/ElasticsearchReindex.java | 157 ++++++++++++--------- .../org/apache/syncope/fit/core/AuditITCase.java | 7 +- 2 files changed, 96 insertions(+), 68 deletions(-) diff --git a/ext/elasticsearch/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/ElasticsearchReindex.java b/ext/elasticsearch/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/ElasticsearchReindex.java index c11a9f4c2d..18b5d7e509 100644 --- a/ext/elasticsearch/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/ElasticsearchReindex.java +++ b/ext/elasticsearch/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/ElasticsearchReindex.java @@ -19,11 +19,18 @@ package org.apache.syncope.core.provisioning.java.job; import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch._helpers.bulk.BulkIngester; +import co.elastic.clients.elasticsearch._helpers.bulk.BulkListener; +import co.elastic.clients.elasticsearch._types.ErrorCause; import co.elastic.clients.elasticsearch._types.mapping.TypeMapping; import co.elastic.clients.elasticsearch.core.BulkRequest; import co.elastic.clients.elasticsearch.core.BulkResponse; +import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; import co.elastic.clients.elasticsearch.indices.IndexSettings; import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; import org.apache.syncope.common.lib.types.AnyTypeKind; import org.apache.syncope.core.persistence.api.dao.AnyDAO; import org.apache.syncope.core.persistence.api.dao.AnyObjectDAO; @@ -44,6 +51,44 @@ import org.springframework.beans.factory.annotation.Autowired; */ public class ElasticsearchReindex extends AbstractSchedTaskJobDelegate<SchedTask> { + protected static class ErrorLoggingBulkListener implements BulkListener<Void> { + + protected static final ErrorLoggingBulkListener INSTANCE = new ErrorLoggingBulkListener(); + + @Override + public void beforeBulk( + final long executionId, + final BulkRequest request, + final List<Void> contexts) { + + // do nothing + } + + @Override + public void afterBulk( + final long executionId, + final BulkRequest request, + final List<Void> contexts, + final BulkResponse response) { + + if (response.errors()) { + String details = response.items().stream().map(BulkResponseItem::error). + filter(Objects::nonNull).map(ErrorCause::toString).collect(Collectors.joining(", ")); + LOG.error("Errors found for request {}; details: {}", executionId, details); + } + } + + @Override + public void afterBulk( + final long executionId, + final BulkRequest request, + final List<Void> contexts, + final Throwable failure) { + + LOG.error("Bulk request {} failed", executionId, failure); + } + } + @Autowired protected ElasticsearchClient client; @@ -125,70 +170,58 @@ public class ElasticsearchReindex extends AbstractSchedTaskJobDelegate<SchedTask int users = userDAO.count(); String uindex = ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.USER); setStatus("Indexing " + users + " users under " + uindex + "..."); - for (int page = 1; page <= (users / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) { - BulkRequest.Builder bulkRequest = new BulkRequest.Builder(); - - for (String user : userDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) { - bulkRequest.operations(op -> op.index(idx -> idx. - index(uindex). - id(user). - document(utils.document(userDAO.find(user))))); - } - try { - BulkResponse response = client.bulk(bulkRequest.build()); - LOG.debug("Index successfully created for {} [{}/{}]: {}", - uindex, page, AnyDAO.DEFAULT_PAGE_SIZE, response); - } catch (Exception e) { - LOG.error("Could not create index for {} [{}/{}]: {}", - uindex, page, AnyDAO.DEFAULT_PAGE_SIZE, e); + try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client). + maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) { + + for (int page = 1; page <= (users / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) { + for (String user : userDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) { + ingester.add(op -> op.index(idx -> idx. + index(uindex). + id(user). + document(utils.document(userDAO.find(user))))); + } } + } catch (Exception e) { + LOG.error("Errors while ingesting index {}", uindex, e); } int groups = groupDAO.count(); String gindex = ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.GROUP); setStatus("Indexing " + groups + " groups under " + gindex + "..."); - for (int page = 1; page <= (groups / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) { - BulkRequest.Builder bulkRequest = new BulkRequest.Builder(); - - for (String group : groupDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) { - bulkRequest.operations(op -> op.index(idx -> idx. - index(gindex). - id(group). - document(utils.document(groupDAO.find(group))))); - } - try { - BulkResponse response = client.bulk(bulkRequest.build()); - LOG.debug("Index successfully created for {} [{}/{}]: {}", - gindex, page, AnyDAO.DEFAULT_PAGE_SIZE, response); - } catch (Exception e) { - LOG.error("Could not create index for {} [{}/{}]: {}", - gindex, page, AnyDAO.DEFAULT_PAGE_SIZE, e); + try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client). + maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) { + + for (int page = 1; page <= (groups / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) { + for (String group : groupDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) { + ingester.add(op -> op.index(idx -> idx. + index(gindex). + id(group). + document(utils.document(groupDAO.find(group))))); + } } + } catch (Exception e) { + LOG.error("Errors while ingesting index {}", uindex, e); } int anyObjects = anyObjectDAO.count(); String aindex = ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.ANY_OBJECT); setStatus("Indexing " + anyObjects + " any objects under " + aindex + "..."); - for (int page = 1; page <= (anyObjects / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) { - BulkRequest.Builder bulkRequest = new BulkRequest.Builder(); - - for (String anyObject : anyObjectDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) { - bulkRequest.operations(op -> op.index(idx -> idx. - index(aindex). - id(anyObject). - document(utils.document(anyObjectDAO.find(anyObject))))); - } - try { - BulkResponse response = client.bulk(bulkRequest.build()); - LOG.debug("Index successfully created for {} [{}/{}]: {}", - aindex, page, AnyDAO.DEFAULT_PAGE_SIZE, response); - } catch (Exception e) { - LOG.error("Could not create index for {} [{}/{}]: {}", - aindex, page, AnyDAO.DEFAULT_PAGE_SIZE, e); + try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client). + maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) { + + for (int page = 1; page <= (anyObjects / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) { + for (String anyObject : anyObjectDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) { + ingester.add(op -> op.index(idx -> idx. + index(aindex). + id(anyObject). + document(utils.document(anyObjectDAO.find(anyObject))))); + } } + } catch (Exception e) { + LOG.error("Errors while ingesting index {}", uindex, e); } indexManager.createRealmIndex(AuthContextUtils.getDomain(), realmSettings(), realmMapping()); @@ -196,24 +229,20 @@ public class ElasticsearchReindex extends AbstractSchedTaskJobDelegate<SchedTask int realms = realmDAO.count(); String rindex = ElasticsearchUtils.getRealmIndex(AuthContextUtils.getDomain()); setStatus("Indexing " + realms + " realms under " + rindex + "..."); - for (int page = 1; page <= (realms / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) { - BulkRequest.Builder bulkRequest = new BulkRequest.Builder(); - - for (String realm : realmDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) { - bulkRequest.operations(op -> op.index(idx -> idx. - index(rindex). - id(realm). - document(utils.document(realmDAO.find(realm))))); - } - try { - BulkResponse response = client.bulk(bulkRequest.build()); - LOG.debug("Index successfully created for {} [{}/{}]: {}", - rindex, page, AnyDAO.DEFAULT_PAGE_SIZE, response); - } catch (Exception e) { - LOG.error("Could not create index for {} [{}/{}]: {}", - rindex, page, AnyDAO.DEFAULT_PAGE_SIZE, e); + try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client). + maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) { + + for (int page = 1; page <= (realms / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) { + for (String realm : realmDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) { + ingester.add(op -> op.index(idx -> idx. + index(rindex). + id(realm). + document(utils.document(realmDAO.find(realm))))); + } } + } catch (Exception e) { + LOG.error("Errors while ingesting index {}", uindex, e); } indexManager.createAuditIndex(AuthContextUtils.getDomain(), auditSettings(), auditMapping()); diff --git a/fit/core-reference/src/test/java/org/apache/syncope/fit/core/AuditITCase.java b/fit/core-reference/src/test/java/org/apache/syncope/fit/core/AuditITCase.java index a3bcd2040d..7e900a4786 100644 --- a/fit/core-reference/src/test/java/org/apache/syncope/fit/core/AuditITCase.java +++ b/fit/core-reference/src/test/java/org/apache/syncope/fit/core/AuditITCase.java @@ -686,8 +686,8 @@ public class AuditITCase extends AbstractITCase { root.getActions().add(logicActions.getKey()); REALM_SERVICE.update(root); - int before = AUDIT_SERVICE.search( - new AuditQuery.Builder().type(AuditElements.EventCategoryType.CUSTOM).build()).getTotalCount(); + AuditQuery query = new AuditQuery.Builder().type(AuditElements.EventCategoryType.CUSTOM).build(); + int before = query(query, MAX_WAIT_SECONDS).size(); try { AUDIT_SERVICE.set(buildAuditConf("syncope.audit.[CUSTOM]:[]:[]:[MY_EVENT]:[SUCCESS]", true)); @@ -696,8 +696,7 @@ public class AuditITCase extends AbstractITCase { plainAttr(attrAddReplacePatch("location", "new" + getUUIDString())). build()); - int after = AUDIT_SERVICE.search( - new AuditQuery.Builder().type(AuditElements.EventCategoryType.CUSTOM).build()).getTotalCount(); + int after = query(query, MAX_WAIT_SECONDS).size(); assertEquals(before + 1, after); } finally { AUDIT_SERVICE.set(buildAuditConf("syncope.audit.[CUSTOM]:[]:[]:[MY_EVENT]:[SUCCESS]", false));
