keith-turner closed pull request #382: ACCUMULO-4782 switch session manager to 
a concurrent map
URL: https://github.com/apache/accumulo/pull/382
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
index 1d2d88d4aa..eed45cf073 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
@@ -20,10 +20,15 @@
 import org.apache.accumulo.server.rpc.TServerUtils;
 
 public class Session {
+
+  enum State {
+    NEW, UNRESERVED, RESERVED, REMOVED
+  }
+
   public final String client;
   long lastAccessTime;
   public long startTime;
-  boolean reserved;
+  State state = State.NEW;
   private final TCredentials credentials;
 
   Session(TCredentials credentials) {
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index bf37855bb4..b04a367561 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.tserver.session;
 
+import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -26,6 +27,8 @@
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.accumulo.core.client.impl.Translator;
 import org.apache.accumulo.core.client.impl.Translators;
@@ -39,18 +42,20 @@
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.tserver.scan.ScanRunState;
 import org.apache.accumulo.tserver.scan.ScanTask;
+import org.apache.accumulo.tserver.session.Session.State;
 import org.apache.accumulo.tserver.tablet.ScanBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 
 public class SessionManager {
   private static final Logger log = 
LoggerFactory.getLogger(SessionManager.class);
 
-  private final SecureRandom random = new SecureRandom();
-  private final Map<Long,Session> sessions = new HashMap<>();
+  private final SecureRandom random;
+  private final ConcurrentMap<Long,Session> sessions = new 
ConcurrentHashMap<>();
   private final long maxIdle;
   private final long maxUpdateIdle;
   private final List<Session> idleSessions = new ArrayList<>();
@@ -62,6 +67,16 @@ public SessionManager(AccumuloConfiguration conf) {
     maxUpdateIdle = 
conf.getTimeInMillis(Property.TSERV_UPDATE_SESSION_MAXIDLE);
     maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
 
+    SecureRandom sr;
+    try {
+      // This is faster than the default secure random which uses /dev/urandom
+      sr = SecureRandom.getInstance("SHA1PRNG");
+    } catch (NoSuchAlgorithmException e) {
+      log.debug("Unable to create SHA1PRNG secure random, using default");
+      sr = new SecureRandom();
+    }
+    random = sr;
+
     Runnable r = new Runnable() {
       @Override
       public void run() {
@@ -70,20 +85,21 @@ public void run() {
     };
 
     SimpleTimer.getInstance(conf).schedule(r, 0, Math.max(maxIdle / 2, 1000));
+
   }
 
-  public synchronized long createSession(Session session, boolean reserve) {
+  public long createSession(Session session, boolean reserve) {
     long sid = random.nextLong();
 
-    while (sessions.containsKey(sid)) {
-      sid = random.nextLong();
+    synchronized (session) {
+      Preconditions.checkArgument(session.state == State.NEW);
+      session.state = reserve ? State.RESERVED : State.UNRESERVED;
+      session.startTime = session.lastAccessTime = System.currentTimeMillis();
     }
 
-    sessions.put(sid, session);
-
-    session.reserved = reserve;
-
-    session.startTime = session.lastAccessTime = System.currentTimeMillis();
+    while (sessions.putIfAbsent(sid, session) != null) {
+      sid = random.nextLong();
+    }
 
     return sid;
   }
@@ -96,56 +112,83 @@ public long getMaxIdleTime() {
    * while a session is reserved, it cannot be canceled or removed
    */
 
-  public synchronized Session reserveSession(long sessionId) {
+  public Session reserveSession(long sessionId) {
     Session session = sessions.get(sessionId);
     if (session != null) {
-      if (session.reserved)
-        throw new IllegalStateException();
-      session.reserved = true;
+      synchronized (session) {
+        if (session.state == State.RESERVED)
+          throw new IllegalStateException("Attempted to reserved session that 
is already reserved " + sessionId);
+        if (session.state == State.REMOVED)
+          return null;
+        session.state = State.RESERVED;
+      }
     }
 
     return session;
 
   }
 
-  public synchronized Session reserveSession(long sessionId, boolean wait) {
+  public Session reserveSession(long sessionId, boolean wait) {
     Session session = sessions.get(sessionId);
+
     if (session != null) {
-      while (wait && session.reserved) {
-        try {
-          wait(1000);
-        } catch (InterruptedException e) {
-          throw new RuntimeException();
+      synchronized (session) {
+
+        if (session.state == State.REMOVED)
+          return null;
+
+        while (wait && session.state == State.RESERVED) {
+          try {
+            session.wait(1000);
+          } catch (InterruptedException e) {
+            throw new RuntimeException();
+          }
         }
-      }
 
-      if (session.reserved)
-        throw new IllegalStateException();
-      session.reserved = true;
+        if (session.state == State.RESERVED)
+          throw new IllegalStateException("Attempted to reserved session that 
is already reserved " + sessionId);
+        if (session.state == State.REMOVED)
+          return null;
+        session.state = State.RESERVED;
+      }
     }
 
     return session;
 
   }
 
-  public synchronized void unreserveSession(Session session) {
-    if (!session.reserved)
-      throw new IllegalStateException();
-    notifyAll();
-    session.reserved = false;
-    session.lastAccessTime = System.currentTimeMillis();
+  public void unreserveSession(Session session) {
+    synchronized (session) {
+      if (session.state == State.REMOVED)
+        return;
+      if (session.state != State.RESERVED)
+        throw new IllegalStateException("Cannon unreserve, state: " + 
session.state);
+      session.notifyAll();
+      session.state = State.UNRESERVED;
+      session.lastAccessTime = System.currentTimeMillis();
+    }
   }
 
-  public synchronized void unreserveSession(long sessionId) {
+  public void unreserveSession(long sessionId) {
     Session session = getSession(sessionId);
-    if (session != null)
+    if (session != null) {
       unreserveSession(session);
+    }
+
   }
 
-  public synchronized Session getSession(long sessionId) {
+  public Session getSession(long sessionId) {
     Session session = sessions.get(sessionId);
-    if (session != null)
-      session.lastAccessTime = System.currentTimeMillis();
+
+    if (session != null) {
+      synchronized (session) {
+        if (session.state == State.REMOVED) {
+          return null;
+        }
+        session.lastAccessTime = System.currentTimeMillis();
+      }
+    }
+
     return session;
   }
 
@@ -154,35 +197,47 @@ public Session removeSession(long sessionId) {
   }
 
   public Session removeSession(long sessionId, boolean unreserve) {
-    Session session = null;
-    synchronized (this) {
-      session = sessions.remove(sessionId);
-      if (unreserve && session != null)
-        unreserveSession(session);
-    }
 
-    // do clean up out side of lock..
-    if (session != null)
-      session.cleanup();
+    Session session = sessions.remove(sessionId);
+    if (session != null) {
+      boolean doCleanup = false;
+      synchronized (session) {
+        if (session.state != State.REMOVED) {
+          if (unreserve) {
+            unreserveSession(session);
+          }
+          doCleanup = true;
+          session.state = State.REMOVED;
+        }
+      }
+
+      if (doCleanup) {
+        session.cleanup();
+      }
+    }
 
     return session;
   }
 
   private void sweep(final long maxIdle, final long maxUpdateIdle) {
     List<Session> sessionsToCleanup = new ArrayList<>();
-    synchronized (this) {
-      Iterator<Session> iter = sessions.values().iterator();
-      while (iter.hasNext()) {
-        Session session = iter.next();
-        long configuredIdle = maxIdle;
-        if (session instanceof UpdateSession) {
-          configuredIdle = maxUpdateIdle;
-        }
-        long idleTime = System.currentTimeMillis() - session.lastAccessTime;
-        if (idleTime > configuredIdle && !session.reserved) {
-          log.info("Closing idle session from user=" + session.getUser() + ", 
client=" + session.client + ", idle=" + idleTime + "ms");
-          iter.remove();
-          sessionsToCleanup.add(session);
+
+    Iterator<Session> iter = sessions.values().iterator();
+    while (iter.hasNext()) {
+      Session session = iter.next();
+      synchronized (session) {
+        if (session.state == State.UNRESERVED) {
+          long configuredIdle = maxIdle;
+          if (session instanceof UpdateSession) {
+            configuredIdle = maxUpdateIdle;
+          }
+          long idleTime = System.currentTimeMillis() - session.lastAccessTime;
+          if (idleTime > configuredIdle) {
+            log.info("Closing idle session from user=" + session.getUser() + 
", client=" + session.client + ", idle=" + idleTime + "ms");
+            iter.remove();
+            sessionsToCleanup.add(session);
+            session.state = State.REMOVED;
+          }
         }
       }
     }
@@ -190,39 +245,47 @@ private void sweep(final long maxIdle, final long 
maxUpdateIdle) {
     // do clean up outside of lock for TabletServer in a synchronized block 
for simplicity vice a synchronized list
 
     synchronized (idleSessions) {
-
       sessionsToCleanup.addAll(idleSessions);
-
       idleSessions.clear();
+    }
 
-      // perform cleanup for all of the sessions
-      for (Session session : sessionsToCleanup) {
-        if (!session.cleanup())
+    // perform cleanup for all of the sessions
+    for (Session session : sessionsToCleanup) {
+      if (!session.cleanup())
+        synchronized (idleSessions) {
           idleSessions.add(session);
-      }
+        }
     }
+
   }
 
-  public synchronized void removeIfNotAccessed(final long sessionId, final 
long delay) {
+  public void removeIfNotAccessed(final long sessionId, final long delay) {
     Session session = sessions.get(sessionId);
     if (session != null) {
-      final long removeTime = session.lastAccessTime;
+      long tmp;
+      synchronized (session) {
+        tmp = session.lastAccessTime;
+      }
+      final long removeTime = tmp;
       TimerTask r = new TimerTask() {
         @Override
         public void run() {
-          Session sessionToCleanup = null;
-          synchronized (SessionManager.this) {
-            Session session2 = sessions.get(sessionId);
-            if (session2 != null && session2.lastAccessTime == removeTime && 
!session2.reserved) {
+          Session session2 = sessions.get(sessionId);
+          if (session2 != null) {
+            boolean shouldRemove = false;
+            synchronized (session2) {
+              if (session2.lastAccessTime == removeTime && session2.state == 
State.UNRESERVED) {
+                session2.state = State.REMOVED;
+                shouldRemove = true;
+              }
+            }
+
+            if (shouldRemove) {
               log.info("Closing not accessed session from user=" + 
session2.getUser() + ", client=" + session2.client + ", duration=" + delay + 
"ms");
               sessions.remove(sessionId);
-              sessionToCleanup = session2;
+              session2.cleanup();
             }
           }
-
-          // call clean up outside of lock
-          if (sessionToCleanup != null)
-            sessionToCleanup.cleanup();
         }
       };
 
@@ -230,7 +293,7 @@ public void run() {
     }
   }
 
-  public synchronized Map<String,MapCounter<ScanRunState>> 
getActiveScansPerTable() {
+  public Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() {
     Map<String,MapCounter<ScanRunState>> counts = new HashMap<>();
     Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<>();
 
@@ -243,9 +306,10 @@ public void run() {
       }
     }
 
-    for (Entry<Long,Session> entry : sessions.entrySet()) {
+    for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), 
copiedIdleSessions)) {
 
       Session session = entry.getValue();
+
       @SuppressWarnings("rawtypes")
       ScanTask nbt = null;
       String tableID = null;
@@ -280,7 +344,7 @@ public void run() {
     return counts;
   }
 
-  public synchronized List<ActiveScan> getActiveScans() {
+  public List<ActiveScan> getActiveScans() {
 
     final List<ActiveScan> activeScans = new ArrayList<>();
     final long ct = System.currentTimeMillis();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to