This is an automated email from the ASF dual-hosted git repository. jkevan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/master by this push: new 6b4ab5e91 UNOMI-741: use new strategy to retrieve session by always asking latest available rollover index (#586) 6b4ab5e91 is described below commit 6b4ab5e91632dd4ec94389f174026334d5c7a0de Author: kevan Jahanshahi <jke...@apache.org> AuthorDate: Thu Mar 9 17:56:03 2023 +0100 UNOMI-741: use new strategy to retrieve session by always asking latest available rollover index (#586) --- .../ElasticSearchPersistenceServiceImpl.java | 47 ++++++++++++++++------ 1 file changed, 34 insertions(+), 13 deletions(-) diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index f0929887b..ee3f0fc02 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -50,9 +50,11 @@ import org.apache.unomi.persistence.spi.aggregate.TermsAggregate; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.Version; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequest; import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest; @@ -88,6 +90,7 @@ import org.elasticsearch.client.indices.GetMappingsResponse; import org.elasticsearch.client.indices.IndexTemplatesExistRequest; import org.elasticsearch.client.indices.PutIndexTemplateRequest; import org.elasticsearch.client.indices.PutMappingRequest; +import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -149,7 +152,6 @@ import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.security.cert.X509Certificate; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import static org.elasticsearch.index.query.QueryBuilders.termQuery; @@ -157,18 +159,15 @@ import static org.elasticsearch.index.query.QueryBuilders.termQuery; @SuppressWarnings("rawtypes") public class ElasticSearchPersistenceServiceImpl implements PersistenceService, SynchronousBundleListener { - public static final String BULK_PROCESSOR_CONCURRENT_REQUESTS = "bulkProcessor.concurrentRequests"; - public static final String BULK_PROCESSOR_BULK_ACTIONS = "bulkProcessor.bulkActions"; public static final String BULK_PROCESSOR_BULK_SIZE = "bulkProcessor.bulkSize"; public static final String BULK_PROCESSOR_FLUSH_INTERVAL = "bulkProcessor.flushInterval"; public static final String BULK_PROCESSOR_BACKOFF_POLICY = "bulkProcessor.backoffPolicy"; - public static final String MONTHLY_INDEX_ITEMS_MONTHLY_INDEXED = "monthlyIndex.itemsMonthlyIndexedOverride"; - public static final String INDEX_DATE_PREFIX = "date-"; public static final String SEQ_NO = "seq_no"; public static final String PRIMARY_TERM = "primary_term"; private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName()); private static final String ROLLOVER_LIFECYCLE_NAME = "unomi-rollover-policy"; + private boolean throwExceptions = false; private RestHighLevelClient client; private BulkProcessor bulkProcessor; @@ -191,7 +190,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private ConditionESQueryBuilderDispatcher conditionESQueryBuilderDispatcher; private List<String> itemsMonthlyIndexed; private Map<String, String> routingByType; - private final Map<String, String> sessionAffinityCache = new ConcurrentHashMap<>(); private Integer defaultQueryLimit = 10; private Integer removeByQueryTimeoutInMinutes = 10; @@ -203,6 +201,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private String bulkProcessorBackoffPolicy = "exponential"; // Rollover configuration + private String sessionLatestIndex; private List<String> rolloverIndices; private String rolloverMaxSize; private String rolloverMaxAge; @@ -502,12 +501,24 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, bulkProcessor = getBulkProcessor(); } + // Wait for green logger.info("Waiting for GREEN cluster status..."); - client.cluster().health(new ClusterHealthRequest().waitForGreenStatus(), RequestOptions.DEFAULT); - logger.info("Cluster status is GREEN"); + // We keep in memory the latest available session index to be able to load session using direct GET access on ES + if (isItemTypeRollingOver(Session.ITEM_TYPE)) { + logger.info("Sessions are using rollover indices, loading latest session index available ..."); + GetAliasesResponse sessionAliasResponse = client.indices().getAlias(new GetAliasesRequest(getIndex(Session.ITEM_TYPE)), RequestOptions.DEFAULT); + Map<String, Set<AliasMetaData>> aliases = sessionAliasResponse.getAliases(); + if (!aliases.isEmpty()) { + sessionLatestIndex = new TreeSet<>(aliases.keySet()).last(); + logger.info("Latest available session index found is: {}", sessionLatestIndex); + } else { + throw new IllegalStateException("No index found for sessions"); + } + } + return true; } }.executeInClassLoader(); @@ -814,8 +825,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } String documentId = getDocumentIDForItemType(itemId, itemType); - String affinityIndex = "session".equals(itemType) && sessionAffinityCache.containsKey(documentId) ? sessionAffinityCache.get(documentId) : null; - if (affinityIndex == null && isItemTypeRollingOver(itemType)) { + boolean sessionSpecialDirectAccess = sessionLatestIndex != null && Session.ITEM_TYPE.equals(itemType) ; + if (!sessionSpecialDirectAccess && isItemTypeRollingOver(itemType)) { return new MetricAdapter<T>(metricsService, ".loadItemWithQuery") { @Override public T execute(Object... args) throws Exception { @@ -834,7 +845,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } }.execute(); } else { - GetRequest getRequest = new GetRequest(affinityIndex != null ? affinityIndex : getIndex(itemType), documentId); + // Special handling for session we check the latest available index directly to speed up session loading + GetRequest getRequest = new GetRequest(sessionSpecialDirectAccess ? sessionLatestIndex : getIndex(itemType), documentId); GetResponse response = client.get(getRequest, RequestOptions.DEFAULT); if (response.isExists()) { String sourceAsString = response.getSourceAsString(); @@ -924,7 +936,16 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, if (bulkProcessor == null || !useBatching) { indexRequest.setRefreshPolicy(getRefreshPolicy(itemType)); IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT); - setMetadata(item, response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex()); + String responseIndex = response.getIndex(); + setMetadata(item, response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), responseIndex); + + // Special handling for session, in case of new session we check that a rollover happen or not to update the latest available index + if (Session.ITEM_TYPE.equals(itemType) && + sessionLatestIndex != null && + response.getResult().equals(DocWriteResponse.Result.CREATED) && + !responseIndex.equals(sessionLatestIndex)) { + sessionLatestIndex = responseIndex; + } } else { bulkProcessor.add(indexRequest); } @@ -1000,7 +1021,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private UpdateRequest createUpdateRequest(Class clazz, Item item, Map source, boolean alwaysOverwrite) { String itemType = Item.getItemType(clazz); String documentId = getDocumentIDForItemType(item.getItemId(), itemType); - String index = getIndex(itemType); + String index = item.getSystemMetadata("index") != null ? (String) item.getSystemMetadata("index") : getIndex(itemType); UpdateRequest updateRequest = new UpdateRequest(index, documentId); updateRequest.doc(source);