Repository: lens Updated Branches: refs/heads/master fe66131a4 -> 98990c39f
LENS-1379 : Fix session expiry for sessions in which operations were done Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/98990c39 Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/98990c39 Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/98990c39 Branch: refs/heads/master Commit: 98990c39f4f4826beaf59afb0ef9961f566000c3 Parents: fe66131 Author: Amareshwari Sriramadasu <amareshw...@gmail.com> Authored: Mon Jan 2 17:45:24 2017 +0530 Committer: Puneet <puneet.gu...@inmobi.com> Committed: Mon Jan 2 17:45:24 2017 +0530 ---------------------------------------------------------------------- .../lens/server/session/HiveSessionService.java | 58 ++++++++-------- .../lens/server/session/LensSessionImpl.java | 20 +++--- .../TestQueryIndependenceFromSessionClose.java | 71 +++++++++++++++++--- 3 files changed, 102 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/98990c39/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java b/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java index 21e2a62..b480d14 100644 --- a/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java +++ b/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java @@ -276,17 +276,19 @@ public class HiveSessionService extends BaseLensService implements SessionServic */ @Override public void setSessionParameter(LensSessionHandle sessionid, String key, String value) { - setSessionParameter(sessionid, key, value, true); + HashMap<String, String> config = Maps.newHashMap(); + config.put(key, value); + setSessionParameters(sessionid, config); } + /** * Sets the session parameter. * * @param sessionid the sessionid * @param config map of string-string. each entry represents key and the value to be set for that key - * @param addToSession the add to session */ - protected void setSessionParameters(LensSessionHandle sessionid, Map<String, String> config, boolean addToSession) { + protected void setSessionParameters(LensSessionHandle sessionid, Map<String, String> config) { log.info("Request to Set params:" + config); try { acquire(sessionid); @@ -297,17 +299,11 @@ public class HiveSessionService extends BaseLensService implements SessionServic var = var.substring(SystemVariables.HIVECONF_PREFIX.length()); } getSession(sessionid).getSessionConf().set(var, entry.getValue()); - if (addToSession) { - String command = "set" + " " + entry.getKey() + "= " + entry.getValue(); - closeCliServiceOp(getCliService().executeStatement(getHiveSessionHandle(sessionid), command, null)); - } else { - getSession(sessionid).getHiveConf().set(entry.getKey(), entry.getValue()); - } + String command = "set" + " " + entry.getKey() + "= " + entry.getValue(); + closeCliServiceOp(getCliService().executeStatement(getHiveSessionHandle(sessionid), command, null)); } // add to persist - if (addToSession) { - getSession(sessionid).setConfig(config); - } + getSession(sessionid).setConfig(config); log.info("Set params:" + config); } catch (HiveSQLException e) { throw new WebApplicationException(e); @@ -315,18 +311,18 @@ public class HiveSessionService extends BaseLensService implements SessionServic release(sessionid); } } - /** - * Sets the session parameter. - * - * @param sessionid the sessionid - * @param key the key - * @param value the value - * @param addToSession the add to session - */ - protected void setSessionParameter(LensSessionHandle sessionid, String key, String value, boolean addToSession) { - HashMap<String, String> config = Maps.newHashMap(); - config.put(key, value); - setSessionParameters(sessionid, config, addToSession); + + private void setSessionParametersOnRestore(LensSessionHandle sessionid, Map<String, String> config) { + // set in session conf + for(Map.Entry<String, String> entry: config.entrySet()) { + String var = entry.getKey(); + if (var.indexOf(SystemVariables.HIVECONF_PREFIX) == 0) { + var = var.substring(SystemVariables.HIVECONF_PREFIX.length()); + } + getSession(sessionid).getSessionConf().set(var, entry.getValue()); + getSession(sessionid).getHiveConf().set(entry.getKey(), entry.getValue()); + } + log.info("Set params on restart:" + config); } /* @@ -367,7 +363,7 @@ public class HiveSessionService extends BaseLensService implements SessionServic LensSessionHandle sessionHandle = persistInfo.getSessionHandle(); restoreSession(sessionHandle, persistInfo.getUsername(), persistInfo.getPassword()); LensSessionImpl session = getSession(sessionHandle); - session.setLastAccessTime(persistInfo.getLastAccessTime()); + session.getLensSessionPersistInfo().setLastAccessTime(persistInfo.getLastAccessTime()); session.getLensSessionPersistInfo().setConfig(persistInfo.getConfig()); session.getLensSessionPersistInfo().setResources(persistInfo.getResources()); session.setCurrentDatabase(persistInfo.getDatabase()); @@ -384,7 +380,7 @@ public class HiveSessionService extends BaseLensService implements SessionServic // Add config for restored sessions try{ - setSessionParameters(sessionHandle, session.getConfig(), false); + setSessionParametersOnRestore(sessionHandle, session.getConfig()); } catch (Exception e) { log.error("Error setting parameters " + session.getConfig() + " for session: " + session, e); @@ -504,7 +500,7 @@ public class HiveSessionService extends BaseLensService implements SessionServic } } - Runnable getSessionExpiryRunnable() { + public Runnable getSessionExpiryRunnable() { return sessionExpiryRunnable; } @@ -517,7 +513,7 @@ public class HiveSessionService extends BaseLensService implements SessionServic * Run internal. */ public void runInternal() { - List<LensSessionHandle> sessionsToRemove = new ArrayList<LensSessionHandle>(SESSION_MAP.values()); + List<LensSessionHandle> sessionsToRemove = new ArrayList<>(SESSION_MAP.values()); Iterator<LensSessionHandle> itr = sessionsToRemove.iterator(); while (itr.hasNext()) { LensSessionHandle sessionHandle = itr.next(); @@ -527,10 +523,12 @@ public class HiveSessionService extends BaseLensService implements SessionServic itr.remove(); } } catch (ClientErrorException nfe) { + log.error("Error getting session " + sessionHandle.getPublicId(), nfe); itr.remove(); } } + log.info("Sessions to remove : {} out of {} all sessions", sessionsToRemove.size(), SESSION_MAP.size()); // Now close all inactive sessions for (LensSessionHandle sessionHandle : sessionsToRemove) { try { @@ -540,6 +538,7 @@ public class HiveSessionService extends BaseLensService implements SessionServic + new Date(lastAccessTime)); notifyEvent(new SessionExpired(System.currentTimeMillis(), sessionHandle)); } catch (ClientErrorException nfe) { + log.error("Error getting session " + sessionHandle.getPublicId(), nfe); // Do nothing } catch (LensException e) { log.error("Error closing session " + sessionHandle.getPublicId() + " reason " + e.getMessage(), e); @@ -555,9 +554,10 @@ public class HiveSessionService extends BaseLensService implements SessionServic @Override public void run() { try { + log.info("Running session expiry run"); runInternal(); } catch (Exception e) { - log.warn("Unknown error while checking for inactive sessions - " + e.getMessage()); + log.warn("Unknown error while checking for inactive sessions - ", e); } } } http://git-wip-us.apache.org/repos/asf/lens/blob/98990c39/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java b/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java index 34c901c..08a5cff 100644 --- a/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java @@ -62,9 +62,6 @@ public class LensSessionImpl extends HiveSessionImpl implements AutoCloseable { /** The persist info. */ private LensSessionPersistInfo persistInfo = new LensSessionPersistInfo(); - /** The last access time. */ - private long lastAccessTime = System.currentTimeMillis(); - /** The session timeout. */ private long sessionTimeout; private static class IntegerThreadLocal extends ThreadLocal<Integer> { @@ -116,7 +113,7 @@ public class LensSessionImpl extends HiveSessionImpl implements AutoCloseable { getSessionHandle().getHandleIdentifier().getSecretId())); persistInfo.setUsername(getUserName()); persistInfo.setPassword(getPassword()); - persistInfo.setLastAccessTime(lastAccessTime); + persistInfo.setLastAccessTime(System.currentTimeMillis()); persistInfo.setSessionConf(sessionConf); if (sessionConf != null) { for (Map.Entry<String, String> entry : sessionConf.entrySet()) { @@ -280,12 +277,17 @@ public class LensSessionImpl extends HiveSessionImpl implements AutoCloseable { } public boolean isActive() { - return System.currentTimeMillis() - lastAccessTime < sessionTimeout - && (!persistInfo.markedForClose|| activeOperationsPresent()); + // session is active, if any active operations are present. + // If no active operations are present, session is active if timeout is not reached and session is not + // marked for close + return activeOperationsPresent() || ((System.currentTimeMillis() - persistInfo.lastAccessTime < sessionTimeout) + && !persistInfo.markedForClose); } + public boolean isMarkedForClose() { return persistInfo.isMarkedForClose(); } + public synchronized void setActive() { setLastAccessTime(System.currentTimeMillis()); } @@ -468,12 +470,12 @@ public class LensSessionImpl extends HiveSessionImpl implements AutoCloseable { return persistInfo; } - void setLastAccessTime(long lastAccessTime) { - this.lastAccessTime = lastAccessTime; + public void setLastAccessTime(long lastAccessTime) { + persistInfo.lastAccessTime = lastAccessTime; } public long getLastAccessTime() { - return lastAccessTime; + return persistInfo.lastAccessTime; } /** http://git-wip-us.apache.org/repos/asf/lens/blob/98990c39/lens-server/src/test/java/org/apache/lens/server/query/TestQueryIndependenceFromSessionClose.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryIndependenceFromSessionClose.java b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryIndependenceFromSessionClose.java index 8c1bb7b..017584c 100644 --- a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryIndependenceFromSessionClose.java +++ b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryIndependenceFromSessionClose.java @@ -22,10 +22,7 @@ import static org.apache.lens.server.api.LensConfConstants.*; import static org.testng.Assert.*; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; import javax.ws.rs.core.Application; import javax.ws.rs.core.MediaType; @@ -33,6 +30,7 @@ import javax.ws.rs.core.Response; import org.apache.lens.api.LensConf; import org.apache.lens.api.LensSessionHandle; +import org.apache.lens.api.query.LensQuery; import org.apache.lens.api.query.QueryHandle; import org.apache.lens.api.query.QueryStatus; import org.apache.lens.api.result.LensAPIResult; @@ -41,6 +39,7 @@ import org.apache.lens.driver.hive.HiveDriver; import org.apache.lens.server.LensJerseyTest; import org.apache.lens.server.LensServerTestUtil; import org.apache.lens.server.LensServices; +import org.apache.lens.server.api.LensConfConstants; import org.apache.lens.server.api.LensServerAPITestUtil; import org.apache.lens.server.api.driver.LensDriver; import org.apache.lens.server.api.error.LensException; @@ -51,6 +50,9 @@ import org.apache.lens.server.common.RestAPITestUtil; import org.apache.lens.server.common.TestResourceFile; import org.apache.lens.server.error.LensServerErrorCode; import org.apache.lens.server.session.HiveSessionService; +import org.apache.lens.server.session.LensSessionImpl; + +import org.apache.hadoop.hive.conf.HiveConf; import org.glassfish.jersey.test.TestProperties; import org.testng.annotations.*; @@ -111,10 +113,6 @@ public class TestQueryIndependenceFromSessionClose extends LensJerseyTest { QUERY_PERSISTENT_RESULT_INDRIVER, true, QUERY_OUTPUT_FORMATTER, TestQueryService.DeferredPersistentResultFormatter.class.getName()); } - @AfterClass - public void restart() { - restartLensServer(); - } @Override public Map<String, String> getServerConfOverWrites() { @@ -150,10 +148,17 @@ public class TestQueryIndependenceFromSessionClose extends LensJerseyTest { private void customRestartLensServer() { queryService = null; - super.restartLensServer(getServerConf(), false); + super.restartLensServer(getServerConf()); getQueryService(); } + private void restartLensServerWithLowerExpiry() { + sessionService = null; + HiveConf hconf = new HiveConf(getServerConf()); + hconf.setLong(LensConfConstants.SESSION_TIMEOUT_SECONDS, 1L); + super.restartLensServer(hconf); + getSessionService(); + } /* * (non-Javadoc) * @@ -277,6 +282,54 @@ public class TestQueryIndependenceFromSessionClose extends LensJerseyTest { return sessions; } + @Test + public void testSessionExpiryWithActiveOperation() throws Exception { + LensSessionHandle oldSession = getSession(); + assertTrue(sessionService.getSession(oldSession).isActive()); + restartLensServerWithLowerExpiry(); + assertFalse(sessionService.getSession(oldSession).isActive()); + // create a new session and launch a query + LensSessionHandle sessionHandle = getSession(); + LensSessionImpl session = sessionService.getSession(sessionHandle); + QueryHandle handle = RestAPITestUtil.executeAndGetHandle(target(), + Optional.of(sessionHandle), Optional.of("select * from " + TEST_TABLE), Optional.of(conf), defaultMT); + assertTrue(session.isActive()); + session.setLastAccessTime( + session.getLastAccessTime() - 2000 * getServerConf().getLong(LensConfConstants.SESSION_TIMEOUT_SECONDS, + LensConfConstants.SESSION_TIMEOUT_SECONDS_DEFAULT)); + assertTrue(session.isActive()); + assertFalse(session.isMarkedForClose()); + + LensSessionHandle sessionHandle2 = getSession(); + LensQuery ctx = RestAPITestUtil.getLensQuery(target(), sessionHandle2, handle, defaultMT); + while (!ctx.getStatus().finished()) { + ctx = RestAPITestUtil.getLensQuery(target(), sessionHandle2, handle, defaultMT); + Thread.sleep(1000); + sessionHandle2 = getSession(); + } + assertEquals(ctx.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL, String.valueOf(ctx)); + assertFalse(session.isActive()); + assertFalse(session.isMarkedForClose()); + + // run the expiry thread + sessionService.getSessionExpiryRunnable().run(); + try { + sessionService.getSession(sessionHandle); + // should throw exception since session should be expired by now + fail("Expected get session to fail for session " + sessionHandle.getPublicId()); + } catch (Exception e) { + // pass + } + try { + sessionService.getSession(oldSession); + // should throw exception since session should be expired by now + fail("Expected get session to fail for session " + oldSession.getPublicId()); + } catch (Exception e) { + // pass + } + restartLensServer(); + lensSessionId = getSession(); + } @AfterMethod private void waitForPurge() throws InterruptedException { waitForPurge(0, getQueryService().finishedQueries);