This is an automated email from the ASF dual-hosted git repository.

jkevan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b4ab5e91 UNOMI-741: use new strategy to retrieve session by always 
asking latest available rollover index (#586)
6b4ab5e91 is described below

commit 6b4ab5e91632dd4ec94389f174026334d5c7a0de
Author: kevan Jahanshahi <jke...@apache.org>
AuthorDate: Thu Mar 9 17:56:03 2023 +0100

    UNOMI-741: use new strategy to retrieve session by always asking latest 
available rollover index (#586)
---
 .../ElasticSearchPersistenceServiceImpl.java       | 47 ++++++++++++++++------
 1 file changed, 34 insertions(+), 13 deletions(-)

diff --git 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index f0929887b..ee3f0fc02 100644
--- 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -50,9 +50,11 @@ import 
org.apache.unomi.persistence.spi.aggregate.TermsAggregate;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.DocWriteResponse;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import 
org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequest;
 import org.elasticsearch.action.admin.indices.alias.Alias;
+import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
 import 
org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
@@ -88,6 +90,7 @@ import org.elasticsearch.client.indices.GetMappingsResponse;
 import org.elasticsearch.client.indices.IndexTemplatesExistRequest;
 import org.elasticsearch.client.indices.PutIndexTemplateRequest;
 import org.elasticsearch.client.indices.PutMappingRequest;
+import org.elasticsearch.cluster.metadata.AliasMetaData;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -149,7 +152,6 @@ import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
 import java.security.cert.X509Certificate;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
 import static org.elasticsearch.index.query.QueryBuilders.termQuery;
@@ -157,18 +159,15 @@ import static 
org.elasticsearch.index.query.QueryBuilders.termQuery;
 @SuppressWarnings("rawtypes")
 public class ElasticSearchPersistenceServiceImpl implements 
PersistenceService, SynchronousBundleListener {
 
-    public static final String BULK_PROCESSOR_CONCURRENT_REQUESTS = 
"bulkProcessor.concurrentRequests";
-    public static final String BULK_PROCESSOR_BULK_ACTIONS = 
"bulkProcessor.bulkActions";
     public static final String BULK_PROCESSOR_BULK_SIZE = 
"bulkProcessor.bulkSize";
     public static final String BULK_PROCESSOR_FLUSH_INTERVAL = 
"bulkProcessor.flushInterval";
     public static final String BULK_PROCESSOR_BACKOFF_POLICY = 
"bulkProcessor.backoffPolicy";
-    public static final String MONTHLY_INDEX_ITEMS_MONTHLY_INDEXED = 
"monthlyIndex.itemsMonthlyIndexedOverride";
-    public static final String INDEX_DATE_PREFIX = "date-";
     public static final String SEQ_NO = "seq_no";
     public static final String PRIMARY_TERM = "primary_term";
 
     private static final Logger logger = 
LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName());
     private static final String ROLLOVER_LIFECYCLE_NAME = 
"unomi-rollover-policy";
+
     private boolean throwExceptions = false;
     private RestHighLevelClient client;
     private BulkProcessor bulkProcessor;
@@ -191,7 +190,6 @@ public class ElasticSearchPersistenceServiceImpl implements 
PersistenceService,
     private ConditionESQueryBuilderDispatcher 
conditionESQueryBuilderDispatcher;
     private List<String> itemsMonthlyIndexed;
     private Map<String, String> routingByType;
-    private final Map<String, String> sessionAffinityCache = new 
ConcurrentHashMap<>();
 
     private Integer defaultQueryLimit = 10;
     private Integer removeByQueryTimeoutInMinutes = 10;
@@ -203,6 +201,7 @@ public class ElasticSearchPersistenceServiceImpl implements 
PersistenceService,
     private String bulkProcessorBackoffPolicy = "exponential";
 
     // Rollover configuration
+    private String sessionLatestIndex;
     private List<String> rolloverIndices;
     private String rolloverMaxSize;
     private String rolloverMaxAge;
@@ -502,12 +501,24 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
                     bulkProcessor = getBulkProcessor();
                 }
 
+                // Wait for green
                 logger.info("Waiting for GREEN cluster status...");
-
                 client.cluster().health(new 
ClusterHealthRequest().waitForGreenStatus(), RequestOptions.DEFAULT);
-
                 logger.info("Cluster status is GREEN");
 
+                // We keep in memory the latest available session index to be 
able to load session using direct GET access on ES
+                if (isItemTypeRollingOver(Session.ITEM_TYPE)) {
+                    logger.info("Sessions are using rollover indices, loading 
latest session index available ...");
+                    GetAliasesResponse sessionAliasResponse = 
client.indices().getAlias(new GetAliasesRequest(getIndex(Session.ITEM_TYPE)), 
RequestOptions.DEFAULT);
+                    Map<String, Set<AliasMetaData>> aliases = 
sessionAliasResponse.getAliases();
+                    if (!aliases.isEmpty()) {
+                        sessionLatestIndex = new 
TreeSet<>(aliases.keySet()).last();
+                        logger.info("Latest available session index found is: 
{}", sessionLatestIndex);
+                    } else {
+                        throw new IllegalStateException("No index found for 
sessions");
+                    }
+                }
+
                 return true;
             }
         }.executeInClassLoader();
@@ -814,8 +825,8 @@ public class ElasticSearchPersistenceServiceImpl implements 
PersistenceService,
                     }
                     String documentId = getDocumentIDForItemType(itemId, 
itemType);
 
-                    String affinityIndex = "session".equals(itemType) && 
sessionAffinityCache.containsKey(documentId) ? 
sessionAffinityCache.get(documentId) : null;
-                    if (affinityIndex == null && 
isItemTypeRollingOver(itemType)) {
+                    boolean sessionSpecialDirectAccess = sessionLatestIndex != 
null && Session.ITEM_TYPE.equals(itemType) ;
+                    if (!sessionSpecialDirectAccess && 
isItemTypeRollingOver(itemType)) {
                         return new MetricAdapter<T>(metricsService, 
".loadItemWithQuery") {
                             @Override
                             public T execute(Object... args) throws Exception {
@@ -834,7 +845,8 @@ public class ElasticSearchPersistenceServiceImpl implements 
PersistenceService,
                             }
                         }.execute();
                     } else {
-                        GetRequest getRequest = new GetRequest(affinityIndex 
!= null ? affinityIndex : getIndex(itemType), documentId);
+                        // Special handling for session we check the latest 
available index directly to speed up session loading
+                        GetRequest getRequest = new 
GetRequest(sessionSpecialDirectAccess ? sessionLatestIndex : 
getIndex(itemType), documentId);
                         GetResponse response = client.get(getRequest, 
RequestOptions.DEFAULT);
                         if (response.isExists()) {
                             String sourceAsString = 
response.getSourceAsString();
@@ -924,7 +936,16 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
                         if (bulkProcessor == null || !useBatching) {
                             
indexRequest.setRefreshPolicy(getRefreshPolicy(itemType));
                             IndexResponse response = 
client.index(indexRequest, RequestOptions.DEFAULT);
-                            setMetadata(item, response.getVersion(), 
response.getSeqNo(), response.getPrimaryTerm(), response.getIndex());
+                            String responseIndex = response.getIndex();
+                            setMetadata(item, response.getVersion(), 
response.getSeqNo(), response.getPrimaryTerm(), responseIndex);
+
+                            // Special handling for session, in case of new 
session we check that a rollover happen or not to update the latest available 
index
+                            if (Session.ITEM_TYPE.equals(itemType) &&
+                                    sessionLatestIndex != null &&
+                                    
response.getResult().equals(DocWriteResponse.Result.CREATED) &&
+                                    !responseIndex.equals(sessionLatestIndex)) 
{
+                                sessionLatestIndex = responseIndex;
+                            }
                         } else {
                             bulkProcessor.add(indexRequest);
                         }
@@ -1000,7 +1021,7 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
     private UpdateRequest createUpdateRequest(Class clazz, Item item, Map 
source, boolean alwaysOverwrite) {
         String itemType = Item.getItemType(clazz);
         String documentId = getDocumentIDForItemType(item.getItemId(), 
itemType);
-        String index = getIndex(itemType);
+        String index =  item.getSystemMetadata("index") != null ? (String) 
item.getSystemMetadata("index") : getIndex(itemType);
 
         UpdateRequest updateRequest = new UpdateRequest(index, documentId);
         updateRequest.doc(source);

Reply via email to