This is an automated email from the ASF dual-hosted git repository.

ilgrosso pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/syncope.git

commit 38def2a8d7e8e40abf5f7787dbfa7e12ee59635d
Author: Francesco Chicchiriccò <[email protected]>
AuthorDate: Mon Feb 3 16:52:21 2025 +0100

    Allow for easier customization in ElasticsearchReindex
---
 .../java/job/ElasticsearchReindex.java             | 215 ++++++++++++---------
 pom.xml                                            |  11 ++
 2 files changed, 133 insertions(+), 93 deletions(-)

diff --git 
a/ext/elasticsearch/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/ElasticsearchReindex.java
 
b/ext/elasticsearch/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/ElasticsearchReindex.java
index 9535b55d86..84791947a6 100644
--- 
a/ext/elasticsearch/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/ElasticsearchReindex.java
+++ 
b/ext/elasticsearch/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/ElasticsearchReindex.java
@@ -27,9 +27,11 @@ import co.elastic.clients.elasticsearch.core.BulkRequest;
 import co.elastic.clients.elasticsearch.core.BulkResponse;
 import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
 import co.elastic.clients.elasticsearch.indices.IndexSettings;
+import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.syncope.common.lib.types.AnyTypeKind;
 import org.apache.syncope.core.persistence.api.dao.AnyDAO;
 import org.apache.syncope.core.persistence.api.dao.AnyObjectDAO;
@@ -156,113 +158,140 @@ public class ElasticsearchReindex extends 
AbstractSchedTaskJobDelegate<SchedTask
         return indexManager.defaultAuditMapping();
     }
 
+    protected Pair<String, Long> reindexRealms(final JobExecutionContext 
context) throws IOException {
+        indexManager.createRealmIndex(AuthContextUtils.getDomain(), 
realmSettings(), realmMapping());
+
+        long count = realmDAO.count();
+        String index = 
ElasticsearchUtils.getRealmIndex(AuthContextUtils.getDomain());
+        setStatus("Indexing " + count + " realms under " + index + "...");
+
+        try (BulkIngester<Void> ingester = BulkIngester.of(b -> 
b.client(client).
+                
maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE)))
 {
+
+            for (int page = 0; page <= (count / AnyDAO.DEFAULT_PAGE_SIZE); 
page++) {
+                Pageable pageable = PageRequest.of(page, 
AnyDAO.DEFAULT_PAGE_SIZE, DAO.DEFAULT_SORT);
+                for (Realm realm : realmDAO.findAll(pageable)) {
+                    ingester.add(op -> op.index(idx -> idx.
+                            index(index).
+                            id(realm.getKey()).
+                            document(utils.document(realm))));
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Errors while ingesting index {}", index, e);
+        }
+
+        return Pair.of(index, count);
+    }
+
+    protected Pair<String, Long> reindexUsers(final JobExecutionContext 
context) throws IOException {
+        indexManager.createAnyIndex(AuthContextUtils.getDomain(), 
AnyTypeKind.USER, userSettings(), userMapping());
+
+        long count = userDAO.count();
+        String index = 
ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.USER);
+        setStatus("Indexing " + count + " users under " + index + "...");
+
+        try (BulkIngester<Void> ingester = BulkIngester.of(b -> 
b.client(client).
+                
maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE)))
 {
+
+            for (int page = 0; page <= (count / AnyDAO.DEFAULT_PAGE_SIZE); 
page++) {
+                Pageable pageable = PageRequest.of(page, 
AnyDAO.DEFAULT_PAGE_SIZE, DAO.DEFAULT_SORT);
+                for (User user : userDAO.findAll(pageable)) {
+                    ingester.add(op -> op.index(idx -> idx.
+                            index(index).
+                            id(user.getKey()).
+                            document(utils.document(user))));
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Errors while ingesting index {}", index, e);
+        }
+
+        return Pair.of(index, count);
+    }
+
+    protected Pair<String, Long> reindexGroups(final JobExecutionContext 
context) throws IOException {
+        indexManager.createAnyIndex(AuthContextUtils.getDomain(), 
AnyTypeKind.GROUP, groupSettings(), groupMapping());
+
+        long count = groupDAO.count();
+        String index = 
ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.GROUP);
+        setStatus("Indexing " + count + " groups under " + index + "...");
+
+        try (BulkIngester<Void> ingester = BulkIngester.of(b -> 
b.client(client).
+                
maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE)))
 {
+
+            for (int page = 0; page <= (count / AnyDAO.DEFAULT_PAGE_SIZE); 
page++) {
+                Pageable pageable = PageRequest.of(page, 
AnyDAO.DEFAULT_PAGE_SIZE, DAO.DEFAULT_SORT);
+                for (Group group : groupDAO.findAll(pageable)) {
+                    ingester.add(op -> op.index(idx -> idx.
+                            index(index).
+                            id(group.getKey()).
+                            document(utils.document(group))));
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Errors while ingesting index {}", index, e);
+        }
+
+        return Pair.of(index, count);
+    }
+
+    protected Pair<String, Long> reindexAnyObjects(final JobExecutionContext 
context) throws IOException {
+        indexManager.createAnyIndex(
+                AuthContextUtils.getDomain(), AnyTypeKind.ANY_OBJECT, 
anyObjectSettings(), anyObjectMapping());
+
+        long count = anyObjectDAO.count();
+        String index = 
ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), 
AnyTypeKind.ANY_OBJECT);
+        setStatus("Indexing " + count + " any objects under " + index + "...");
+
+        try (BulkIngester<Void> ingester = BulkIngester.of(b -> 
b.client(client).
+                
maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE)))
 {
+
+            for (int page = 0; page <= (count / AnyDAO.DEFAULT_PAGE_SIZE); 
page++) {
+                Pageable pageable = PageRequest.of(page, 
AnyDAO.DEFAULT_PAGE_SIZE, DAO.DEFAULT_SORT);
+                for (AnyObject anyObject : anyObjectDAO.findAll(pageable)) {
+                    ingester.add(op -> op.index(idx -> idx.
+                            index(index).
+                            id(anyObject.getKey()).
+                            document(utils.document(anyObject))));
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Errors while ingesting index {}", index, e);
+        }
+
+        return Pair.of(index, count);
+    }
+
+    protected String reindexAudit(final JobExecutionContext context) throws 
IOException {
+        indexManager.createAuditIndex(AuthContextUtils.getDomain(), 
auditSettings(), auditMapping());
+        return ElasticsearchUtils.getAuditIndex(AuthContextUtils.getDomain());
+    }
+
     @Override
     protected String doExecute(final JobExecutionContext context) throws 
JobExecutionException {
         if (!context.isDryRun()) {
             setStatus("Start rebuilding indexes");
 
             try {
-                indexManager.createRealmIndex(AuthContextUtils.getDomain(), 
realmSettings(), realmMapping());
-
-                long realms = realmDAO.count();
-                String rindex = 
ElasticsearchUtils.getRealmIndex(AuthContextUtils.getDomain());
-                setStatus("Indexing " + realms + " realms under " + rindex + 
"...");
-
-                try (BulkIngester<Void> ingester = BulkIngester.of(b -> 
b.client(client).
-                        
maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE)))
 {
-
-                    for (int page = 0; page <= (realms / 
AnyDAO.DEFAULT_PAGE_SIZE); page++) {
-                        Pageable pageable = PageRequest.of(page, 
AnyDAO.DEFAULT_PAGE_SIZE, DAO.DEFAULT_SORT);
-                        for (Realm realm : realmDAO.findAll(pageable)) {
-                            ingester.add(op -> op.index(idx -> idx.
-                                    index(rindex).
-                                    id(realm.getKey()).
-                                    document(utils.document(realm))));
-                        }
-                    }
-                } catch (Exception e) {
-                    LOG.error("Errors while ingesting index {}", rindex, e);
-                }
+                Pair<String, Long> rindex = reindexRealms(context);
 
-                indexManager.createAnyIndex(
-                        AuthContextUtils.getDomain(), AnyTypeKind.USER, 
userSettings(), userMapping());
-
-                long users = userDAO.count();
-                String uindex = 
ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.USER);
-                setStatus("Indexing " + users + " users under " + uindex + 
"...");
-
-                try (BulkIngester<Void> ingester = BulkIngester.of(b -> 
b.client(client).
-                        
maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE)))
 {
-
-                    for (int page = 0; page <= (users / 
AnyDAO.DEFAULT_PAGE_SIZE); page++) {
-                        Pageable pageable = PageRequest.of(page, 
AnyDAO.DEFAULT_PAGE_SIZE, DAO.DEFAULT_SORT);
-                        for (User user : userDAO.findAll(pageable)) {
-                            ingester.add(op -> op.index(idx -> idx.
-                                    index(uindex).
-                                    id(user.getKey()).
-                                    document(utils.document(user))));
-                        }
-                    }
-                } catch (Exception e) {
-                    LOG.error("Errors while ingesting index {}", uindex, e);
-                }
+                Pair<String, Long> uindex = reindexUsers(context);
 
-                indexManager.createAnyIndex(
-                        AuthContextUtils.getDomain(), AnyTypeKind.GROUP, 
groupSettings(), groupMapping());
-
-                long groups = groupDAO.count();
-                String gindex = 
ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.GROUP);
-                setStatus("Indexing " + groups + " groups under " + gindex + 
"...");
-
-                try (BulkIngester<Void> ingester = BulkIngester.of(b -> 
b.client(client).
-                        
maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE)))
 {
-
-                    for (int page = 0; page <= (groups / 
AnyDAO.DEFAULT_PAGE_SIZE); page++) {
-                        Pageable pageable = PageRequest.of(page, 
AnyDAO.DEFAULT_PAGE_SIZE, DAO.DEFAULT_SORT);
-                        for (Group group : groupDAO.findAll(pageable)) {
-                            ingester.add(op -> op.index(idx -> idx.
-                                    index(gindex).
-                                    id(group.getKey()).
-                                    document(utils.document(group))));
-                        }
-                    }
-                } catch (Exception e) {
-                    LOG.error("Errors while ingesting index {}", gindex, e);
-                }
+                Pair<String, Long> gindex = reindexGroups(context);
 
-                indexManager.createAnyIndex(
-                        AuthContextUtils.getDomain(), AnyTypeKind.ANY_OBJECT, 
anyObjectSettings(), anyObjectMapping());
-
-                long anyObjects = anyObjectDAO.count();
-                String aindex = 
ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), 
AnyTypeKind.ANY_OBJECT);
-                setStatus("Indexing " + anyObjects + " any objects under " + 
aindex + "...");
-
-                try (BulkIngester<Void> ingester = BulkIngester.of(b -> 
b.client(client).
-                        
maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE)))
 {
-
-                    for (int page = 0; page <= (anyObjects / 
AnyDAO.DEFAULT_PAGE_SIZE); page++) {
-                        Pageable pageable = PageRequest.of(page, 
AnyDAO.DEFAULT_PAGE_SIZE, DAO.DEFAULT_SORT);
-                        for (AnyObject anyObject : 
anyObjectDAO.findAll(pageable)) {
-                            ingester.add(op -> op.index(idx -> idx.
-                                    index(aindex).
-                                    id(anyObject.getKey()).
-                                    document(utils.document(anyObject))));
-                        }
-                    }
-                } catch (Exception e) {
-                    LOG.error("Errors while ingesting index {}", aindex, e);
-                }
+                Pair<String, Long> aindex = reindexAnyObjects(context);
 
-                indexManager.createAuditIndex(AuthContextUtils.getDomain(), 
auditSettings(), auditMapping());
+                String audit = reindexAudit(context);
 
                 setStatus("Rebuild indexes for domain " + 
AuthContextUtils.getDomain() + " successfully completed");
 
                 return "Indexes created:\n"
-                        + " " + rindex + " [" + realms + "]\n"
-                        + " " + uindex + " [" + users + "]\n"
-                        + " " + gindex + " [" + groups + "]\n"
-                        + " " + aindex + " [" + anyObjects + "]\n"
-                        + " " + 
ElasticsearchUtils.getAuditIndex(AuthContextUtils.getDomain());
+                        + " " + rindex.getLeft() + " [" + rindex.getRight() + 
"]\n"
+                        + " " + uindex.getLeft() + " [" + uindex.getRight() + 
"]\n"
+                        + " " + gindex.getLeft() + " [" + gindex.getRight() + 
"]\n"
+                        + " " + aindex.getLeft() + " [" + aindex.getRight() + 
"]\n"
+                        + " " + audit;
             } catch (Exception e) {
                 throw new JobExecutionException("While rebuilding index for 
domain " + AuthContextUtils.getDomain(), e);
             }
diff --git a/pom.xml b/pom.xml
index ab80acf2c5..d853d8ecf1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -822,6 +822,17 @@ under the License.
         <version>${spring-boot.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.tomcat.embed</groupId>
+        <artifactId>tomcat-embed-core</artifactId>
+        <version>${tomcat.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.tomcat.embed</groupId>
+        <artifactId>tomcat-embed-el</artifactId>
+        <version>${tomcat.version}</version>
+      </dependency>
+
       <dependency>
         <groupId>org.apache.cxf</groupId>
         <artifactId>cxf-spring-boot-starter-jaxrs</artifactId>

Reply via email to