murblanc commented on a change in pull request #1504: URL: https://github.com/apache/lucene-solr/pull/1504#discussion_r427923626
########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java ########## @@ -429,87 +440,124 @@ private void release(SessionWrapper sessionWrapper) { * The session can be used by others while the caller is performing operations */ private void returnSession(SessionWrapper sessionWrapper) { - TimeSource timeSource = sessionWrapper.session != null ? sessionWrapper.session.cloudManager.getTimeSource() : TimeSource.NANO_TIME; + boolean present; synchronized (lockObj) { sessionWrapper.status = Status.EXECUTING; - if (log.isDebugEnabled()) { - log.debug("returnSession, curr-time {} sessionWrapper.createTime {}, this.sessionWrapper.createTime {} " - , time(timeSource, MILLISECONDS), - sessionWrapper.createTime, - this.sessionWrapper.createTime); - } - if (sessionWrapper.createTime == this.sessionWrapper.createTime) { - //this session was used for computing new operations and this can now be used for other - // computing - this.sessionWrapper = sessionWrapper; + present = sessionWrapperSet.contains(sessionWrapper); - //one thread who is waiting for this need to be notified. - lockObj.notify(); - } else { - log.debug("create time NOT SAME {} ", SessionWrapper.DEFAULT_INSTANCE.createTime); - //else just ignore it - } + // wake up single thread waiting for a session return (ok if not woken up, wait is short) + lockObj.notify(); } + // Logging + if (present) { + if (log.isDebugEnabled()) { + log.debug("returnSession {}", sessionWrapper.getCreateTime()); + } + } else { + log.warn("returning unknown session {} ", sessionWrapper.getCreateTime()); + } } - public SessionWrapper get(SolrCloudManager cloudManager) throws IOException, InterruptedException { + public SessionWrapper get(SolrCloudManager cloudManager, boolean allowWait) throws IOException, InterruptedException { TimeSource timeSource = cloudManager.getTimeSource(); + long oldestUpdateTimeNs = TimeUnit.SECONDS.convert(timeSource.getTimeNs(), TimeUnit.NANOSECONDS) - SESSION_EXPIRY; + int zkVersion = cloudManager.getDistribStateManager().getAutoScalingConfig().getZkVersion(); + synchronized (lockObj) { - if (sessionWrapper.status == Status.NULL || - sessionWrapper.zkVersion != cloudManager.getDistribStateManager().getAutoScalingConfig().getZkVersion() || - TimeUnit.SECONDS.convert(timeSource.getTimeNs() - sessionWrapper.lastUpdateTime, TimeUnit.NANOSECONDS) > SESSION_EXPIRY) { - //no session available or the session is expired + // If nothing in the cache can possibly work, create a new session + if (!hasNonExpiredSession(zkVersion, oldestUpdateTimeNs)) { return createSession(cloudManager); - } else { + } + + // Try to find a session available right away + SessionWrapper sw = getAvailableSession(zkVersion, oldestUpdateTimeNs); + + if (sw != null) { + if (log.isDebugEnabled()) { + log.debug("reusing session {}", sw.getCreateTime()); + } + return sw; + } else if (allowWait) { + // No session available, but if we wait a bit, maybe one can become available + // wait 1 to 10 secs in case a session is returned. Random to spread wakeup otherwise sessions not reused + long waitForMs = (long) (Math.random() * 9 * 1000 + 1000); + + if (log.isDebugEnabled()) { + log.debug("No sessions are available, all busy COMPUTING. starting wait of {}ms", waitForMs); + } long waitStart = time(timeSource, MILLISECONDS); - //the session is not expired - log.debug("reusing a session {}", this.sessionWrapper.createTime); - if (this.sessionWrapper.status == Status.UNUSED || this.sessionWrapper.status == Status.EXECUTING) { - this.sessionWrapper.status = Status.COMPUTING; - return sessionWrapper; - } else { - //status= COMPUTING it's being used for computing. computing is - if (log.isDebugEnabled()) { - log.debug("session being used. waiting... current time {} ", time(timeSource, MILLISECONDS)); - } - try { - lockObj.wait(10 * 1000);//wait for a max of 10 seconds - } catch (InterruptedException e) { - log.info("interrupted... "); - } + try { + lockObj.wait(waitForMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + if (log.isDebugEnabled()) { + log.debug("out of waiting. wait of {}ms, actual time elapsed {}ms", waitForMs, timeElapsed(timeSource, waitStart, MILLISECONDS)); + } + + // Try again to find an available session + sw = getAvailableSession(zkVersion, oldestUpdateTimeNs); + + if (sw != null) { if (log.isDebugEnabled()) { - log.debug("out of waiting curr-time:{} time-elapsed {}" - , time(timeSource, MILLISECONDS), timeElapsed(timeSource, waitStart, MILLISECONDS)); - } - // now this thread has woken up because it got timed out after 10 seconds or it is notified after - // the session was returned from another COMPUTING operation - if (this.sessionWrapper.status == Status.UNUSED || this.sessionWrapper.status == Status.EXECUTING) { - log.debug("Wait over. reusing the existing session "); - this.sessionWrapper.status = Status.COMPUTING; - return sessionWrapper; - } else { - //create a new Session - return createSession(cloudManager); + log.debug("Wait over. reusing an existing session {}", sw.getCreateTime()); } + return sw; + } else { + return createSession(cloudManager); } + } else { + return createSession(cloudManager); } } } + /** + * Returns an available session from the cache (the best one once cache strategies are defined), or null if no session + * from the cache is available (i.e. all are still COMPUTING, are too old, wrong zk version or the cache is empty).<p> + * This method must be called while holding the monitor on {@link #lockObj}.<p> + * The method updates the session status to computing. + */ + private SessionWrapper getAvailableSession(int zkVersion, long oldestUpdateTimeNs) { + for (SessionWrapper sw : sessionWrapperSet) { + if (sw.status == Status.EXECUTING && sw.getLastUpdateTime() >= oldestUpdateTimeNs && sw.zkVersion == zkVersion) { + sw.status = Status.COMPUTING; + return sw; + } + } + return null; + } + + /** + * Returns true if there's a session in the cache that could be returned (if it was free). This is required to + * know if there's any point in waiting or if a new session should better be created right away. + */ + private boolean hasNonExpiredSession(int zkVersion, long oldestUpdateTimeNs) { Review comment: hasCandidateSession ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org