This is an automated email from the ASF dual-hosted git repository. ilgrosso pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/syncope.git
commit 38def2a8d7e8e40abf5f7787dbfa7e12ee59635d Author: Francesco Chicchiriccò <[email protected]> AuthorDate: Mon Feb 3 16:52:21 2025 +0100 Allow for easier customization in ElasticsearchReindex --- .../java/job/ElasticsearchReindex.java | 215 ++++++++++++--------- pom.xml | 11 ++ 2 files changed, 133 insertions(+), 93 deletions(-) diff --git a/ext/elasticsearch/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/ElasticsearchReindex.java b/ext/elasticsearch/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/ElasticsearchReindex.java index 9535b55d86..84791947a6 100644 --- a/ext/elasticsearch/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/ElasticsearchReindex.java +++ b/ext/elasticsearch/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/ElasticsearchReindex.java @@ -27,9 +27,11 @@ import co.elastic.clients.elasticsearch.core.BulkRequest; import co.elastic.clients.elasticsearch.core.BulkResponse; import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; import co.elastic.clients.elasticsearch.indices.IndexSettings; +import java.io.IOException; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; import org.apache.syncope.common.lib.types.AnyTypeKind; import org.apache.syncope.core.persistence.api.dao.AnyDAO; import org.apache.syncope.core.persistence.api.dao.AnyObjectDAO; @@ -156,113 +158,140 @@ public class ElasticsearchReindex extends AbstractSchedTaskJobDelegate<SchedTask return indexManager.defaultAuditMapping(); } + protected Pair<String, Long> reindexRealms(final JobExecutionContext context) throws IOException { + indexManager.createRealmIndex(AuthContextUtils.getDomain(), realmSettings(), realmMapping()); + + long count = realmDAO.count(); + String index = ElasticsearchUtils.getRealmIndex(AuthContextUtils.getDomain()); + setStatus("Indexing " + count + " realms under " + index + "..."); + + try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client). + maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) { + + for (int page = 0; page <= (count / AnyDAO.DEFAULT_PAGE_SIZE); page++) { + Pageable pageable = PageRequest.of(page, AnyDAO.DEFAULT_PAGE_SIZE, DAO.DEFAULT_SORT); + for (Realm realm : realmDAO.findAll(pageable)) { + ingester.add(op -> op.index(idx -> idx. + index(index). + id(realm.getKey()). + document(utils.document(realm)))); + } + } + } catch (Exception e) { + LOG.error("Errors while ingesting index {}", index, e); + } + + return Pair.of(index, count); + } + + protected Pair<String, Long> reindexUsers(final JobExecutionContext context) throws IOException { + indexManager.createAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.USER, userSettings(), userMapping()); + + long count = userDAO.count(); + String index = ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.USER); + setStatus("Indexing " + count + " users under " + index + "..."); + + try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client). + maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) { + + for (int page = 0; page <= (count / AnyDAO.DEFAULT_PAGE_SIZE); page++) { + Pageable pageable = PageRequest.of(page, AnyDAO.DEFAULT_PAGE_SIZE, DAO.DEFAULT_SORT); + for (User user : userDAO.findAll(pageable)) { + ingester.add(op -> op.index(idx -> idx. + index(index). + id(user.getKey()). + document(utils.document(user)))); + } + } + } catch (Exception e) { + LOG.error("Errors while ingesting index {}", index, e); + } + + return Pair.of(index, count); + } + + protected Pair<String, Long> reindexGroups(final JobExecutionContext context) throws IOException { + indexManager.createAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.GROUP, groupSettings(), groupMapping()); + + long count = groupDAO.count(); + String index = ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.GROUP); + setStatus("Indexing " + count + " groups under " + index + "..."); + + try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client). + maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) { + + for (int page = 0; page <= (count / AnyDAO.DEFAULT_PAGE_SIZE); page++) { + Pageable pageable = PageRequest.of(page, AnyDAO.DEFAULT_PAGE_SIZE, DAO.DEFAULT_SORT); + for (Group group : groupDAO.findAll(pageable)) { + ingester.add(op -> op.index(idx -> idx. + index(index). + id(group.getKey()). + document(utils.document(group)))); + } + } + } catch (Exception e) { + LOG.error("Errors while ingesting index {}", index, e); + } + + return Pair.of(index, count); + } + + protected Pair<String, Long> reindexAnyObjects(final JobExecutionContext context) throws IOException { + indexManager.createAnyIndex( + AuthContextUtils.getDomain(), AnyTypeKind.ANY_OBJECT, anyObjectSettings(), anyObjectMapping()); + + long count = anyObjectDAO.count(); + String index = ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.ANY_OBJECT); + setStatus("Indexing " + count + " any objects under " + index + "..."); + + try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client). + maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) { + + for (int page = 0; page <= (count / AnyDAO.DEFAULT_PAGE_SIZE); page++) { + Pageable pageable = PageRequest.of(page, AnyDAO.DEFAULT_PAGE_SIZE, DAO.DEFAULT_SORT); + for (AnyObject anyObject : anyObjectDAO.findAll(pageable)) { + ingester.add(op -> op.index(idx -> idx. + index(index). + id(anyObject.getKey()). + document(utils.document(anyObject)))); + } + } + } catch (Exception e) { + LOG.error("Errors while ingesting index {}", index, e); + } + + return Pair.of(index, count); + } + + protected String reindexAudit(final JobExecutionContext context) throws IOException { + indexManager.createAuditIndex(AuthContextUtils.getDomain(), auditSettings(), auditMapping()); + return ElasticsearchUtils.getAuditIndex(AuthContextUtils.getDomain()); + } + @Override protected String doExecute(final JobExecutionContext context) throws JobExecutionException { if (!context.isDryRun()) { setStatus("Start rebuilding indexes"); try { - indexManager.createRealmIndex(AuthContextUtils.getDomain(), realmSettings(), realmMapping()); - - long realms = realmDAO.count(); - String rindex = ElasticsearchUtils.getRealmIndex(AuthContextUtils.getDomain()); - setStatus("Indexing " + realms + " realms under " + rindex + "..."); - - try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client). - maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) { - - for (int page = 0; page <= (realms / AnyDAO.DEFAULT_PAGE_SIZE); page++) { - Pageable pageable = PageRequest.of(page, AnyDAO.DEFAULT_PAGE_SIZE, DAO.DEFAULT_SORT); - for (Realm realm : realmDAO.findAll(pageable)) { - ingester.add(op -> op.index(idx -> idx. - index(rindex). - id(realm.getKey()). - document(utils.document(realm)))); - } - } - } catch (Exception e) { - LOG.error("Errors while ingesting index {}", rindex, e); - } + Pair<String, Long> rindex = reindexRealms(context); - indexManager.createAnyIndex( - AuthContextUtils.getDomain(), AnyTypeKind.USER, userSettings(), userMapping()); - - long users = userDAO.count(); - String uindex = ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.USER); - setStatus("Indexing " + users + " users under " + uindex + "..."); - - try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client). - maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) { - - for (int page = 0; page <= (users / AnyDAO.DEFAULT_PAGE_SIZE); page++) { - Pageable pageable = PageRequest.of(page, AnyDAO.DEFAULT_PAGE_SIZE, DAO.DEFAULT_SORT); - for (User user : userDAO.findAll(pageable)) { - ingester.add(op -> op.index(idx -> idx. - index(uindex). - id(user.getKey()). - document(utils.document(user)))); - } - } - } catch (Exception e) { - LOG.error("Errors while ingesting index {}", uindex, e); - } + Pair<String, Long> uindex = reindexUsers(context); - indexManager.createAnyIndex( - AuthContextUtils.getDomain(), AnyTypeKind.GROUP, groupSettings(), groupMapping()); - - long groups = groupDAO.count(); - String gindex = ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.GROUP); - setStatus("Indexing " + groups + " groups under " + gindex + "..."); - - try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client). - maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) { - - for (int page = 0; page <= (groups / AnyDAO.DEFAULT_PAGE_SIZE); page++) { - Pageable pageable = PageRequest.of(page, AnyDAO.DEFAULT_PAGE_SIZE, DAO.DEFAULT_SORT); - for (Group group : groupDAO.findAll(pageable)) { - ingester.add(op -> op.index(idx -> idx. - index(gindex). - id(group.getKey()). - document(utils.document(group)))); - } - } - } catch (Exception e) { - LOG.error("Errors while ingesting index {}", gindex, e); - } + Pair<String, Long> gindex = reindexGroups(context); - indexManager.createAnyIndex( - AuthContextUtils.getDomain(), AnyTypeKind.ANY_OBJECT, anyObjectSettings(), anyObjectMapping()); - - long anyObjects = anyObjectDAO.count(); - String aindex = ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.ANY_OBJECT); - setStatus("Indexing " + anyObjects + " any objects under " + aindex + "..."); - - try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client). - maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) { - - for (int page = 0; page <= (anyObjects / AnyDAO.DEFAULT_PAGE_SIZE); page++) { - Pageable pageable = PageRequest.of(page, AnyDAO.DEFAULT_PAGE_SIZE, DAO.DEFAULT_SORT); - for (AnyObject anyObject : anyObjectDAO.findAll(pageable)) { - ingester.add(op -> op.index(idx -> idx. - index(aindex). - id(anyObject.getKey()). - document(utils.document(anyObject)))); - } - } - } catch (Exception e) { - LOG.error("Errors while ingesting index {}", aindex, e); - } + Pair<String, Long> aindex = reindexAnyObjects(context); - indexManager.createAuditIndex(AuthContextUtils.getDomain(), auditSettings(), auditMapping()); + String audit = reindexAudit(context); setStatus("Rebuild indexes for domain " + AuthContextUtils.getDomain() + " successfully completed"); return "Indexes created:\n" - + " " + rindex + " [" + realms + "]\n" - + " " + uindex + " [" + users + "]\n" - + " " + gindex + " [" + groups + "]\n" - + " " + aindex + " [" + anyObjects + "]\n" - + " " + ElasticsearchUtils.getAuditIndex(AuthContextUtils.getDomain()); + + " " + rindex.getLeft() + " [" + rindex.getRight() + "]\n" + + " " + uindex.getLeft() + " [" + uindex.getRight() + "]\n" + + " " + gindex.getLeft() + " [" + gindex.getRight() + "]\n" + + " " + aindex.getLeft() + " [" + aindex.getRight() + "]\n" + + " " + audit; } catch (Exception e) { throw new JobExecutionException("While rebuilding index for domain " + AuthContextUtils.getDomain(), e); } diff --git a/pom.xml b/pom.xml index ab80acf2c5..d853d8ecf1 100644 --- a/pom.xml +++ b/pom.xml @@ -822,6 +822,17 @@ under the License. <version>${spring-boot.version}</version> </dependency> + <dependency> + <groupId>org.apache.tomcat.embed</groupId> + <artifactId>tomcat-embed-core</artifactId> + <version>${tomcat.version}</version> + </dependency> + <dependency> + <groupId>org.apache.tomcat.embed</groupId> + <artifactId>tomcat-embed-el</artifactId> + <version>${tomcat.version}</version> + </dependency> + <dependency> <groupId>org.apache.cxf</groupId> <artifactId>cxf-spring-boot-starter-jaxrs</artifactId>
