This is an automated email from the ASF dual-hosted git repository. ilgrosso pushed a commit to branch 3_0_X in repository https://gitbox.apache.org/repos/asf/syncope.git
commit ba11908855b40bbcbbbc541dd761903604739976 Author: Francesco Chicchiriccò <[email protected]> AuthorDate: Mon Feb 3 16:52:21 2025 +0100 Allow for easier customization in ElasticsearchReindex --- .../java/job/ElasticsearchReindex.java | 206 ++++++++++++--------- pom.xml | 11 ++ 2 files changed, 128 insertions(+), 89 deletions(-) diff --git a/ext/elasticsearch/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/ElasticsearchReindex.java b/ext/elasticsearch/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/ElasticsearchReindex.java index eba46b5a44..8e9d98085e 100644 --- a/ext/elasticsearch/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/ElasticsearchReindex.java +++ b/ext/elasticsearch/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/ElasticsearchReindex.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; import org.apache.syncope.common.lib.types.AnyTypeKind; import org.apache.syncope.core.persistence.api.dao.AnyDAO; import org.apache.syncope.core.persistence.api.dao.AnyObjectDAO; @@ -150,6 +151,112 @@ public class ElasticsearchReindex extends AbstractSchedTaskJobDelegate<SchedTask return indexManager.defaultAuditMapping(); } + protected Pair<String, Integer> reindexRealms() throws IOException { + indexManager.createRealmIndex(AuthContextUtils.getDomain(), realmSettings(), realmMapping()); + + int count = realmDAO.count(); + String index = ElasticsearchUtils.getRealmIndex(AuthContextUtils.getDomain()); + setStatus("Indexing " + count + " realms under " + index + "..."); + + try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client). + maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) { + + for (int page = 1; page <= (count / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) { + for (String realm : realmDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) { + ingester.add(op -> op.index(idx -> idx. + index(index). + id(realm). + document(utils.document(realmDAO.find(realm))))); + } + } + } catch (Exception e) { + LOG.error("Errors while ingesting index {}", index, e); + } + + return Pair.of(index, count); + } + + protected Pair<String, Integer> reindexUsers() throws IOException { + indexManager.createAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.USER, userSettings(), userMapping()); + + int count = userDAO.count(); + String index = ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.USER); + setStatus("Indexing " + count + " users under " + index + "..."); + + try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client). + maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) { + + for (int page = 1; page <= (count / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) { + for (String user : userDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) { + ingester.add(op -> op.index(idx -> idx. + index(index). + id(user). + document(utils.document(userDAO.find(user))))); + } + } + } catch (Exception e) { + LOG.error("Errors while ingesting index {}", index, e); + } + + return Pair.of(index, count); + } + + protected Pair<String, Integer> reindexGroups() throws IOException { + indexManager.createAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.GROUP, groupSettings(), groupMapping()); + + int count = groupDAO.count(); + String index = ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.GROUP); + setStatus("Indexing " + count + " groups under " + index + "..."); + + try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client). + maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) { + + for (int page = 1; page <= (count / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) { + for (String group : groupDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) { + ingester.add(op -> op.index(idx -> idx. + index(index). + id(group). + document(utils.document(groupDAO.find(group))))); + } + } + } catch (Exception e) { + LOG.error("Errors while ingesting index {}", index, e); + } + + return Pair.of(index, count); + } + + protected Pair<String, Integer> reindexAnyObjects() throws IOException { + indexManager.createAnyIndex( + AuthContextUtils.getDomain(), AnyTypeKind.ANY_OBJECT, anyObjectSettings(), anyObjectMapping()); + + int count = anyObjectDAO.count(); + String index = ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.ANY_OBJECT); + setStatus("Indexing " + count + " any objects under " + index + "..."); + + try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client). + maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) { + + for (int page = 1; page <= (count / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) { + for (String anyObject : anyObjectDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) { + ingester.add(op -> op.index(idx -> idx. + index(index). + id(anyObject). + document(utils.document(anyObjectDAO.find(anyObject))))); + } + } + } catch (Exception e) { + LOG.error("Errors while ingesting index {}", index, e); + } + + return Pair.of(index, count); + } + + protected String reindexAudit() throws IOException { + indexManager.createAuditIndex(AuthContextUtils.getDomain(), auditSettings(), auditMapping()); + return ElasticsearchUtils.getAuditIndex(AuthContextUtils.getDomain()); + } + @Override protected String doExecute(final boolean dryRun, final String executor, final JobExecutionContext context) throws JobExecutionException { @@ -158,103 +265,24 @@ public class ElasticsearchReindex extends AbstractSchedTaskJobDelegate<SchedTask setStatus("Start rebuilding indexes"); try { - indexManager.createRealmIndex(AuthContextUtils.getDomain(), realmSettings(), realmMapping()); - - int realms = realmDAO.count(); - String rindex = ElasticsearchUtils.getRealmIndex(AuthContextUtils.getDomain()); - setStatus("Indexing " + realms + " realms under " + rindex + "..."); - - try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client). - maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) { - - for (int page = 1; page <= (realms / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) { - for (String realm : realmDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) { - ingester.add(op -> op.index(idx -> idx. - index(rindex). - id(realm). - document(utils.document(realmDAO.find(realm))))); - } - } - } catch (Exception e) { - LOG.error("Errors while ingesting index {}", rindex, e); - } + Pair<String, Integer> rindex = reindexRealms(); - indexManager.createAnyIndex( - AuthContextUtils.getDomain(), AnyTypeKind.USER, userSettings(), userMapping()); - - int users = userDAO.count(); - String uindex = ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.USER); - setStatus("Indexing " + users + " users under " + uindex + "..."); - - try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client). - maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) { - - for (int page = 1; page <= (users / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) { - for (String user : userDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) { - ingester.add(op -> op.index(idx -> idx. - index(uindex). - id(user). - document(utils.document(userDAO.find(user))))); - } - } - } catch (Exception e) { - LOG.error("Errors while ingesting index {}", uindex, e); - } + Pair<String, Integer> uindex = reindexUsers(); - indexManager.createAnyIndex( - AuthContextUtils.getDomain(), AnyTypeKind.GROUP, groupSettings(), groupMapping()); - - int groups = groupDAO.count(); - String gindex = ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.GROUP); - setStatus("Indexing " + groups + " groups under " + gindex + "..."); - - try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client). - maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) { - - for (int page = 1; page <= (groups / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) { - for (String group : groupDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) { - ingester.add(op -> op.index(idx -> idx. - index(gindex). - id(group). - document(utils.document(groupDAO.find(group))))); - } - } - } catch (Exception e) { - LOG.error("Errors while ingesting index {}", gindex, e); - } + Pair<String, Integer> gindex = reindexGroups(); - indexManager.createAnyIndex( - AuthContextUtils.getDomain(), AnyTypeKind.ANY_OBJECT, anyObjectSettings(), anyObjectMapping()); - - int anyObjects = anyObjectDAO.count(); - String aindex = ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.ANY_OBJECT); - setStatus("Indexing " + anyObjects + " any objects under " + aindex + "..."); - - try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client). - maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) { - - for (int page = 1; page <= (anyObjects / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) { - for (String anyObject : anyObjectDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) { - ingester.add(op -> op.index(idx -> idx. - index(aindex). - id(anyObject). - document(utils.document(anyObjectDAO.find(anyObject))))); - } - } - } catch (Exception e) { - LOG.error("Errors while ingesting index {}", aindex, e); - } + Pair<String, Integer> aindex = reindexAnyObjects(); - indexManager.createAuditIndex(AuthContextUtils.getDomain(), auditSettings(), auditMapping()); + String audit = reindexAudit(); setStatus("Rebuild indexes for domain " + AuthContextUtils.getDomain() + " successfully completed"); return "Indexes created:\n" - + " " + rindex + " [" + realms + "]\n" - + " " + uindex + " [" + users + "]\n" - + " " + gindex + " [" + groups + "]\n" - + " " + aindex + " [" + anyObjects + "]\n" - + " " + ElasticsearchUtils.getAuditIndex(AuthContextUtils.getDomain()); + + " " + rindex.getLeft() + " [" + rindex.getRight() + "]\n" + + " " + uindex.getLeft() + " [" + uindex.getRight() + "]\n" + + " " + gindex.getLeft() + " [" + gindex.getRight() + "]\n" + + " " + aindex.getLeft() + " [" + aindex.getRight() + "]\n" + + " " + audit; } catch (Exception e) { throw new JobExecutionException("While rebuilding index for domain " + AuthContextUtils.getDomain(), e); } diff --git a/pom.xml b/pom.xml index d23d212932..dad21ba799 100644 --- a/pom.xml +++ b/pom.xml @@ -897,6 +897,17 @@ under the License. <version>${spring-boot.version}</version> </dependency> + <dependency> + <groupId>org.apache.tomcat.embed</groupId> + <artifactId>tomcat-embed-core</artifactId> + <version>${tomcat.version}</version> + </dependency> + <dependency> + <groupId>org.apache.tomcat.embed</groupId> + <artifactId>tomcat-embed-el</artifactId> + <version>${tomcat.version}</version> + </dependency> + <dependency> <groupId>org.apache.cxf</groupId> <artifactId>cxf-spring-boot-starter-jaxrs</artifactId>
