This is an automated email from the ASF dual-hosted git repository.

ilgrosso pushed a commit to branch 3_0_X
in repository https://gitbox.apache.org/repos/asf/syncope.git

commit 1342843e30f8a21e2246986da427db28e1835a2c
Author: Francesco Chicchiriccò <[email protected]>
AuthorDate: Wed Dec 13 08:34:14 2023 +0100

    Adjusting flaky test
---
 .../java/job/ElasticsearchReindex.java             | 157 ++++++++++++---------
 .../org/apache/syncope/fit/core/AuditITCase.java   |   7 +-
 2 files changed, 96 insertions(+), 68 deletions(-)

diff --git 
a/ext/elasticsearch/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/ElasticsearchReindex.java
 
b/ext/elasticsearch/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/ElasticsearchReindex.java
index c11a9f4c2d..18b5d7e509 100644
--- 
a/ext/elasticsearch/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/ElasticsearchReindex.java
+++ 
b/ext/elasticsearch/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/ElasticsearchReindex.java
@@ -19,11 +19,18 @@
 package org.apache.syncope.core.provisioning.java.job;
 
 import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch._helpers.bulk.BulkIngester;
+import co.elastic.clients.elasticsearch._helpers.bulk.BulkListener;
+import co.elastic.clients.elasticsearch._types.ErrorCause;
 import co.elastic.clients.elasticsearch._types.mapping.TypeMapping;
 import co.elastic.clients.elasticsearch.core.BulkRequest;
 import co.elastic.clients.elasticsearch.core.BulkResponse;
+import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
 import co.elastic.clients.elasticsearch.indices.IndexSettings;
 import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
 import org.apache.syncope.common.lib.types.AnyTypeKind;
 import org.apache.syncope.core.persistence.api.dao.AnyDAO;
 import org.apache.syncope.core.persistence.api.dao.AnyObjectDAO;
@@ -44,6 +51,44 @@ import 
org.springframework.beans.factory.annotation.Autowired;
  */
 public class ElasticsearchReindex extends 
AbstractSchedTaskJobDelegate<SchedTask> {
 
+    protected static class ErrorLoggingBulkListener implements 
BulkListener<Void> {
+
+        protected static final ErrorLoggingBulkListener INSTANCE = new 
ErrorLoggingBulkListener();
+
+        @Override
+        public void beforeBulk(
+                final long executionId,
+                final BulkRequest request,
+                final List<Void> contexts) {
+
+            // do nothing
+        }
+
+        @Override
+        public void afterBulk(
+                final long executionId,
+                final BulkRequest request,
+                final List<Void> contexts,
+                final BulkResponse response) {
+
+            if (response.errors()) {
+                String details = 
response.items().stream().map(BulkResponseItem::error).
+                        
filter(Objects::nonNull).map(ErrorCause::toString).collect(Collectors.joining(",
 "));
+                LOG.error("Errors found for request {}; details: {}", 
executionId, details);
+            }
+        }
+
+        @Override
+        public void afterBulk(
+                final long executionId,
+                final BulkRequest request,
+                final List<Void> contexts,
+                final Throwable failure) {
+
+            LOG.error("Bulk request {} failed", executionId, failure);
+        }
+    }
+
     @Autowired
     protected ElasticsearchClient client;
 
@@ -125,70 +170,58 @@ public class ElasticsearchReindex extends 
AbstractSchedTaskJobDelegate<SchedTask
                 int users = userDAO.count();
                 String uindex = 
ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.USER);
                 setStatus("Indexing " + users + " users under " + uindex + 
"...");
-                for (int page = 1; page <= (users / AnyDAO.DEFAULT_PAGE_SIZE) 
+ 1; page++) {
-                    BulkRequest.Builder bulkRequest = new 
BulkRequest.Builder();
-
-                    for (String user : userDAO.findAllKeys(page, 
AnyDAO.DEFAULT_PAGE_SIZE)) {
-                        bulkRequest.operations(op -> op.index(idx -> idx.
-                                index(uindex).
-                                id(user).
-                                document(utils.document(userDAO.find(user)))));
-                    }
 
-                    try {
-                        BulkResponse response = 
client.bulk(bulkRequest.build());
-                        LOG.debug("Index successfully created for {} [{}/{}]: 
{}",
-                                uindex, page, AnyDAO.DEFAULT_PAGE_SIZE, 
response);
-                    } catch (Exception e) {
-                        LOG.error("Could not create index for {} [{}/{}]: {}",
-                                uindex, page, AnyDAO.DEFAULT_PAGE_SIZE, e);
+                try (BulkIngester<Void> ingester = BulkIngester.of(b -> 
b.client(client).
+                        
maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE)))
 {
+
+                    for (int page = 1; page <= (users / 
AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
+                        for (String user : userDAO.findAllKeys(page, 
AnyDAO.DEFAULT_PAGE_SIZE)) {
+                            ingester.add(op -> op.index(idx -> idx.
+                                    index(uindex).
+                                    id(user).
+                                    
document(utils.document(userDAO.find(user)))));
+                        }
                     }
+                } catch (Exception e) {
+                    LOG.error("Errors while ingesting index {}", uindex, e);
                 }
 
                 int groups = groupDAO.count();
                 String gindex = 
ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.GROUP);
                 setStatus("Indexing " + groups + " groups under " + gindex + 
"...");
-                for (int page = 1; page <= (groups / AnyDAO.DEFAULT_PAGE_SIZE) 
+ 1; page++) {
-                    BulkRequest.Builder bulkRequest = new 
BulkRequest.Builder();
-
-                    for (String group : groupDAO.findAllKeys(page, 
AnyDAO.DEFAULT_PAGE_SIZE)) {
-                        bulkRequest.operations(op -> op.index(idx -> idx.
-                                index(gindex).
-                                id(group).
-                                
document(utils.document(groupDAO.find(group)))));
-                    }
 
-                    try {
-                        BulkResponse response = 
client.bulk(bulkRequest.build());
-                        LOG.debug("Index successfully created for {} [{}/{}]: 
{}",
-                                gindex, page, AnyDAO.DEFAULT_PAGE_SIZE, 
response);
-                    } catch (Exception e) {
-                        LOG.error("Could not create index for {} [{}/{}]: {}",
-                                gindex, page, AnyDAO.DEFAULT_PAGE_SIZE, e);
+                try (BulkIngester<Void> ingester = BulkIngester.of(b -> 
b.client(client).
+                        
maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE)))
 {
+
+                    for (int page = 1; page <= (groups / 
AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
+                        for (String group : groupDAO.findAllKeys(page, 
AnyDAO.DEFAULT_PAGE_SIZE)) {
+                            ingester.add(op -> op.index(idx -> idx.
+                                    index(gindex).
+                                    id(group).
+                                    
document(utils.document(groupDAO.find(group)))));
+                        }
                     }
+                } catch (Exception e) {
+                    LOG.error("Errors while ingesting index {}", uindex, e);
                 }
 
                 int anyObjects = anyObjectDAO.count();
                 String aindex = 
ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), 
AnyTypeKind.ANY_OBJECT);
                 setStatus("Indexing " + anyObjects + " any objects under " + 
aindex + "...");
-                for (int page = 1; page <= (anyObjects / 
AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
-                    BulkRequest.Builder bulkRequest = new 
BulkRequest.Builder();
-
-                    for (String anyObject : anyObjectDAO.findAllKeys(page, 
AnyDAO.DEFAULT_PAGE_SIZE)) {
-                        bulkRequest.operations(op -> op.index(idx -> idx.
-                                index(aindex).
-                                id(anyObject).
-                                
document(utils.document(anyObjectDAO.find(anyObject)))));
-                    }
 
-                    try {
-                        BulkResponse response = 
client.bulk(bulkRequest.build());
-                        LOG.debug("Index successfully created for {} [{}/{}]: 
{}",
-                                aindex, page, AnyDAO.DEFAULT_PAGE_SIZE, 
response);
-                    } catch (Exception e) {
-                        LOG.error("Could not create index for {} [{}/{}]: {}",
-                                aindex, page, AnyDAO.DEFAULT_PAGE_SIZE, e);
+                try (BulkIngester<Void> ingester = BulkIngester.of(b -> 
b.client(client).
+                        
maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE)))
 {
+
+                    for (int page = 1; page <= (anyObjects / 
AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
+                        for (String anyObject : anyObjectDAO.findAllKeys(page, 
AnyDAO.DEFAULT_PAGE_SIZE)) {
+                            ingester.add(op -> op.index(idx -> idx.
+                                    index(aindex).
+                                    id(anyObject).
+                                    
document(utils.document(anyObjectDAO.find(anyObject)))));
+                        }
                     }
+                } catch (Exception e) {
+                    LOG.error("Errors while ingesting index {}", uindex, e);
                 }
 
                 indexManager.createRealmIndex(AuthContextUtils.getDomain(), 
realmSettings(), realmMapping());
@@ -196,24 +229,20 @@ public class ElasticsearchReindex extends 
AbstractSchedTaskJobDelegate<SchedTask
                 int realms = realmDAO.count();
                 String rindex = 
ElasticsearchUtils.getRealmIndex(AuthContextUtils.getDomain());
                 setStatus("Indexing " + realms + " realms under " + rindex + 
"...");
-                for (int page = 1; page <= (realms / AnyDAO.DEFAULT_PAGE_SIZE) 
+ 1; page++) {
-                    BulkRequest.Builder bulkRequest = new 
BulkRequest.Builder();
-
-                    for (String realm : realmDAO.findAllKeys(page, 
AnyDAO.DEFAULT_PAGE_SIZE)) {
-                        bulkRequest.operations(op -> op.index(idx -> idx.
-                                index(rindex).
-                                id(realm).
-                                
document(utils.document(realmDAO.find(realm)))));
-                    }
 
-                    try {
-                        BulkResponse response = 
client.bulk(bulkRequest.build());
-                        LOG.debug("Index successfully created for {} [{}/{}]: 
{}",
-                                rindex, page, AnyDAO.DEFAULT_PAGE_SIZE, 
response);
-                    } catch (Exception e) {
-                        LOG.error("Could not create index for {} [{}/{}]: {}",
-                                rindex, page, AnyDAO.DEFAULT_PAGE_SIZE, e);
+                try (BulkIngester<Void> ingester = BulkIngester.of(b -> 
b.client(client).
+                        
maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE)))
 {
+
+                    for (int page = 1; page <= (realms / 
AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
+                        for (String realm : realmDAO.findAllKeys(page, 
AnyDAO.DEFAULT_PAGE_SIZE)) {
+                            ingester.add(op -> op.index(idx -> idx.
+                                    index(rindex).
+                                    id(realm).
+                                    
document(utils.document(realmDAO.find(realm)))));
+                        }
                     }
+                } catch (Exception e) {
+                    LOG.error("Errors while ingesting index {}", uindex, e);
                 }
 
                 indexManager.createAuditIndex(AuthContextUtils.getDomain(), 
auditSettings(), auditMapping());
diff --git 
a/fit/core-reference/src/test/java/org/apache/syncope/fit/core/AuditITCase.java 
b/fit/core-reference/src/test/java/org/apache/syncope/fit/core/AuditITCase.java
index a3bcd2040d..7e900a4786 100644
--- 
a/fit/core-reference/src/test/java/org/apache/syncope/fit/core/AuditITCase.java
+++ 
b/fit/core-reference/src/test/java/org/apache/syncope/fit/core/AuditITCase.java
@@ -686,8 +686,8 @@ public class AuditITCase extends AbstractITCase {
         root.getActions().add(logicActions.getKey());
         REALM_SERVICE.update(root);
 
-        int before = AUDIT_SERVICE.search(
-                new 
AuditQuery.Builder().type(AuditElements.EventCategoryType.CUSTOM).build()).getTotalCount();
+        AuditQuery query = new 
AuditQuery.Builder().type(AuditElements.EventCategoryType.CUSTOM).build();
+        int before = query(query, MAX_WAIT_SECONDS).size();
         try {
             
AUDIT_SERVICE.set(buildAuditConf("syncope.audit.[CUSTOM]:[]:[]:[MY_EVENT]:[SUCCESS]",
 true));
 
@@ -696,8 +696,7 @@ public class AuditITCase extends AbstractITCase {
                     plainAttr(attrAddReplacePatch("location", "new" + 
getUUIDString())).
                     build());
 
-            int after = AUDIT_SERVICE.search(
-                    new 
AuditQuery.Builder().type(AuditElements.EventCategoryType.CUSTOM).build()).getTotalCount();
+            int after = query(query, MAX_WAIT_SECONDS).size();
             assertEquals(before + 1, after);
         } finally {
             
AUDIT_SERVICE.set(buildAuditConf("syncope.audit.[CUSTOM]:[]:[]:[MY_EVENT]:[SUCCESS]",
 false));

Reply via email to