This is an automated email from the ASF dual-hosted git repository. bross pushed a commit to branch support/1.12 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 1ed0bf31a985ed93c0bf957d1ccd0abbb3de7e2a Author: Sarah <sab...@pivotal.io> AuthorDate: Fri Apr 9 11:35:35 2021 -0400 Revert "GEODE-8513: Remove (de)serialization of local sessions." This reverts commit 5eb8521253986c06b52ff02d00086380626bb262. --- .../catalina/DeltaSessionManagerJUnitTest.java | 223 ++++++++++++- .../session/catalina/DeltaSessionManager.java | 356 ++++++++++++++++++++- 2 files changed, 565 insertions(+), 14 deletions(-) diff --git a/extensions/geode-modules-test/src/main/java/org/apache/geode/modules/session/catalina/DeltaSessionManagerJUnitTest.java b/extensions/geode-modules-test/src/main/java/org/apache/geode/modules/session/catalina/DeltaSessionManagerJUnitTest.java index 835575b..fec8571 100644 --- a/extensions/geode-modules-test/src/main/java/org/apache/geode/modules/session/catalina/DeltaSessionManagerJUnitTest.java +++ b/extensions/geode-modules-test/src/main/java/org/apache/geode/modules/session/catalina/DeltaSessionManagerJUnitTest.java @@ -17,28 +17,50 @@ package org.apache.geode.modules.session.catalina; import static org.apache.geode.modules.util.RegionConfiguration.DEFAULT_MAX_INACTIVE_INTERVAL; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; import java.beans.PropertyChangeEvent; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.util.HashSet; import java.util.Set; import javax.servlet.http.HttpSession; import org.apache.catalina.Context; +import org.apache.catalina.Loader; import org.apache.catalina.Session; +import org.apache.catalina.session.StandardSession; import org.apache.juli.logging.Log; import org.junit.Test; import org.apache.geode.cache.Cache; import org.apache.geode.cache.Region; +import org.apache.geode.cache.query.FunctionDomainException; +import org.apache.geode.cache.query.NameResolutionException; +import org.apache.geode.cache.query.Query; +import org.apache.geode.cache.query.QueryInvocationTargetException; +import org.apache.geode.cache.query.SelectResults; +import org.apache.geode.cache.query.TypeMismatchException; +import org.apache.geode.cache.query.internal.InternalQueryService; +import org.apache.geode.cache.query.internal.LinkedResultSet; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.modules.session.catalina.internal.DeltaSessionStatistics; @@ -195,15 +217,142 @@ public abstract class DeltaSessionManagerJUnitTest { } @Test + public void loadActivatesAndAddsSingleSessionWithValidIdAndMoreRecentAccessTime() + throws IOException, ClassNotFoundException { + String contextPath = "contextPath"; + String expectedStoreDir = ""; + DeltaSession newSession = mock(DeltaSession.class); + DeltaSession existingSession = mock(DeltaSession.class); + + prepareMocksForLoadTest(contextPath, newSession, existingSession, expectedStoreDir); + + manager.load(); + + verify(newSession).activate(); + verify(manager).add(newSession); + } + + @Test + public void loadLogsWarningAndDoesNotAddSessionWhenSessionStoreNotFound() + throws IOException, ClassNotFoundException { + String contextPath = "contextPath"; + String expectedStoreDir = ""; + DeltaSession newSession = mock(DeltaSession.class); + DeltaSession existingSession = mock(DeltaSession.class); + + prepareMocksForLoadTest(contextPath, newSession, existingSession, expectedStoreDir); + + doReturn(null).when(manager).getFileAtPath(any(), any()); + + manager.load(); + + verify(logger).debug("No session store file found"); + verify(manager, times(0)).add(any()); + } + + @Test + public void loadDoesNotAddSessionToManagerWithValidIdAndLessRecentAccessTime() + throws IOException, ClassNotFoundException { + String contextPath = "contextPath"; + String expectedStoreDir = ""; + DeltaSession newSession = mock(DeltaSession.class); + DeltaSession existingSession = mock(DeltaSession.class); + + prepareMocksForLoadTest(contextPath, newSession, existingSession, expectedStoreDir); + + when(existingSession.getLastAccessedTime()).thenReturn(2L); + + manager.load(); + + verify(newSession, times(0)).activate(); + verify(manager, times(0)).add(newSession); + } + + @Test + public void unloadWritesSingleSessionToDiskWhenIdIsValid() + throws IOException, NameResolutionException, TypeMismatchException, + QueryInvocationTargetException, FunctionDomainException { + String sessionId = "sessionId"; + DeltaSession session = mock(DeltaSession.class); + FileOutputStream fos = mock(FileOutputStream.class); + BufferedOutputStream bos = mock(BufferedOutputStream.class); + ObjectOutputStream oos = mock(ObjectOutputStream.class); + + prepareMocksForUnloadTest(sessionId, fos, bos, oos, session); + + manager.unload(); + + verify((StandardSession) session).writeObjectData(oos); + } + + @Test + public void unloadDoesNotWriteSessionToDiskAndClosesOutputStreamsWhenOutputStreamThrowsIOException() + throws IOException, NameResolutionException, TypeMismatchException, + QueryInvocationTargetException, FunctionDomainException { + String sessionId = "sessionId"; + DeltaSession session = mock(DeltaSession.class); + FileOutputStream fos = mock(FileOutputStream.class); + BufferedOutputStream bos = mock(BufferedOutputStream.class); + ObjectOutputStream oos = mock(ObjectOutputStream.class); + + prepareMocksForUnloadTest(sessionId, fos, bos, oos, session); + + String exceptionMessage = "Output Stream IOException"; + + IOException exception = new IOException(exceptionMessage); + + doThrow(exception).when(manager).getObjectOutputStream(bos); + + assertThatThrownBy(() -> manager.unload()).isInstanceOf(IOException.class) + .hasMessage(exceptionMessage); + + verify((StandardSession) session, times(0)).writeObjectData(oos); + verify(bos).close(); + verify(fos).close(); + } + + @Test + public void unloadDoesNotWriteSessionToDiskAndClosesOutputStreamsWhenSessionIsWrongClass() + throws IOException, NameResolutionException, TypeMismatchException, + QueryInvocationTargetException, FunctionDomainException { + String sessionId = "sessionId"; + DeltaSession session = mock(DeltaSession.class); + FileOutputStream fos = mock(FileOutputStream.class); + BufferedOutputStream bos = mock(BufferedOutputStream.class); + ObjectOutputStream oos = mock(ObjectOutputStream.class); + + prepareMocksForUnloadTest(sessionId, fos, bos, oos, session); + + Session invalidSession = + mock(Session.class, withSettings().extraInterfaces(DeltaSessionInterface.class)); + + doReturn(invalidSession).when(manager).findSession(sessionId); + + assertThatThrownBy(() -> manager.unload()).isInstanceOf(IOException.class); + + verify((StandardSession) session, times(0)).writeObjectData(oos); + verify(oos).close(); + } + + @Test public void successfulUnloadWithClientServerSessionCachePerformsLocalDestroy() - throws IOException { - when(sessionCache.getCache()).thenReturn(cache); - when(context.getPath()).thenReturn("contextPath"); + throws IOException, NameResolutionException, TypeMismatchException, + QueryInvocationTargetException, FunctionDomainException { + String sessionId = "sessionId"; + DeltaSession session = mock(DeltaSession.class); + FileOutputStream fos = mock(FileOutputStream.class); + BufferedOutputStream bos = mock(BufferedOutputStream.class); + ObjectOutputStream oos = mock(ObjectOutputStream.class); + + prepareMocksForUnloadTest(sessionId, fos, bos, oos, session); + when(sessionCache.isClientServer()).thenReturn(true); + when(session.getId()).thenReturn(sessionId); manager.unload(); - verify(operatingRegion).localClear(); + verify((StandardSession) session).writeObjectData(oos); + verify(operatingRegion).localDestroy(sessionId); } @Test @@ -253,4 +402,70 @@ public abstract class DeltaSessionManagerJUnitTest { verify(manager).setMaxInactiveInterval(oldValue); } + + public void prepareMocksForUnloadTest(String sessionId, FileOutputStream fos, + BufferedOutputStream bos, ObjectOutputStream oos, DeltaSession session) + throws NameResolutionException, TypeMismatchException, QueryInvocationTargetException, + FunctionDomainException, IOException { + String regionName = "regionName"; + String contextPath = "contextPath"; + String catalinaBaseSystemProp = "Catalina/Base"; + String systemFileSeparator = "/"; + String expectedStoreDir = catalinaBaseSystemProp + systemFileSeparator + "temp"; + + InternalQueryService queryService = mock(InternalQueryService.class); + Query query = mock(Query.class); + File store = mock(File.class); + SelectResults results = new LinkedResultSet(); + + when(sessionCache.getCache()).thenReturn(cache); + when(context.getPath()).thenReturn(contextPath); + when(cache.getQueryService()).thenReturn(queryService); + when(queryService.newQuery(anyString())).thenReturn(query); + when(query.execute()).thenReturn(results); + doReturn(catalinaBaseSystemProp).when(manager) + .getSystemPropertyValue(DeltaSessionManager.catalinaBaseSystemProperty); + doReturn(systemFileSeparator).when(manager) + .getSystemPropertyValue(DeltaSessionManager.fileSeparatorSystemProperty); + doReturn(store).when(manager).getFileAtPath(expectedStoreDir, contextPath); + doReturn(fos).when(manager).getFileOutputStream(store); + doReturn(bos).when(manager).getBufferedOutputStream(fos); + doReturn(oos).when(manager).getObjectOutputStream(bos); + doReturn(regionName).when(manager).getRegionName(); + doReturn(session).when(manager).findSession(sessionId); + doNothing().when(manager).writeToObjectOutputStream(any(), any()); + + results.add(sessionId); + } + + public void prepareMocksForLoadTest(String contextPath, DeltaSession newSession, + DeltaSession existingSession, String expectedStoreDir) + throws IOException, ClassNotFoundException { + String catalinaBaseSystemProp = "Catalina/Base"; + String systemFileSeparator = "/"; + expectedStoreDir = catalinaBaseSystemProp + systemFileSeparator + "temp"; + String newSessionId = "newSessionId"; + + File store = mock(File.class); + FileInputStream fis = mock(FileInputStream.class); + BufferedInputStream bis = mock(BufferedInputStream.class); + ObjectInputStream ois = mock(ObjectInputStream.class); + Loader loader = mock(Loader.class); + + when(context.getPath()).thenReturn(contextPath); + when(context.getLoader()).thenReturn(loader); + when(newSession.getId()).thenReturn(newSessionId); + when(newSession.getLastAccessedTime()).thenReturn(1L); + when(newSession.isValid()).thenReturn(true); + when(existingSession.getLastAccessedTime()).thenReturn(0L); + doReturn(catalinaBaseSystemProp).when(manager).getSystemPropertyValue("catalina.base"); + doReturn(systemFileSeparator).when(manager).getSystemPropertyValue("file.separator"); + doReturn(store).when(manager).getFileAtPath(expectedStoreDir, contextPath); + doReturn(fis).when(manager).getFileInputStream(store); + doReturn(bis).when(manager).getBufferedInputStream(fis); + doReturn(ois).when(manager).getObjectInputStream(bis); + doReturn(1).when(manager).getSessionCountFromObjectInputStream(ois); + doReturn(newSession).when(manager).getNewSession(); + doReturn(existingSession).when(operatingRegion).get(newSessionId); + } } diff --git a/extensions/geode-modules/src/main/java/org/apache/geode/modules/session/catalina/DeltaSessionManager.java b/extensions/geode-modules/src/main/java/org/apache/geode/modules/session/catalina/DeltaSessionManager.java index ec49b5d..2af36fe 100644 --- a/extensions/geode-modules/src/main/java/org/apache/geode/modules/session/catalina/DeltaSessionManager.java +++ b/extensions/geode-modules/src/main/java/org/apache/geode/modules/session/catalina/DeltaSessionManager.java @@ -16,10 +16,20 @@ package org.apache.geode.modules.session.catalina; import java.beans.PropertyChangeEvent; import java.beans.PropertyChangeListener; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Set; import java.util.Timer; import java.util.TimerTask; @@ -30,16 +40,23 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.catalina.Container; import org.apache.catalina.Context; import org.apache.catalina.Lifecycle; +import org.apache.catalina.Loader; import org.apache.catalina.Pipeline; import org.apache.catalina.Session; import org.apache.catalina.Valve; import org.apache.catalina.session.ManagerBase; import org.apache.catalina.session.StandardSession; +import org.apache.catalina.util.CustomObjectInputStream; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.EntryNotFoundException; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.query.Query; +import org.apache.geode.cache.query.QueryService; +import org.apache.geode.cache.query.SelectResults; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.modules.session.catalina.internal.DeltaSessionStatistics; import org.apache.geode.modules.util.ContextMapper; @@ -549,12 +566,13 @@ public abstract class DeltaSessionManager extends ManagerBase @Override public void load() throws ClassNotFoundException, IOException { + doLoad(); ContextMapper.addContext(getContextName(), this); } @Override public void unload() throws IOException { - clearLocalCache(); + doUnload(); ContextMapper.removeContext(getContextName()); } @@ -650,22 +668,340 @@ public abstract class DeltaSessionManager extends ManagerBase } /** - * Clear the local cache to avoid ClassCastException if container is being reloaded. + * Save any currently active sessions in the appropriate persistence mechanism, if any. If + * persistence is not supported, this method returns without doing anything. + * + * @throws IOException if an input/output error occurs */ - private void clearLocalCache() { - final Log logger = getLogger(); - final boolean debugEnabled = logger.isDebugEnabled(); + private void doUnload() throws IOException { + QueryService querySvc = getSessionCache().getCache().getQueryService(); + Context context = getTheContext(); + + if (context == null) { + return; + } + + String regionName; + if (getRegionName().startsWith("/")) { + regionName = getRegionName(); + } else { + regionName = "/" + getRegionName(); + } + + Query query = querySvc.newQuery("select s.id from " + regionName + + " as s where s.contextName = '" + context.getPath() + "'"); + + if (getLogger().isDebugEnabled()) { + getLogger().debug("Query: " + query.getQueryString()); + } + + SelectResults results; + try { + results = (SelectResults) query.execute(); + } catch (Exception ex) { + getLogger().error("Unable to perform query during doUnload", ex); + return; + } + + if (results.isEmpty()) { + getLogger().debug("No sessions to unload for context " + context.getPath()); + return; // nothing to do + } + // Open an output stream to the specified pathname, if any + File store = sessionStore(context.getPath()); + if (store == null) { + return; + } + if (getLogger().isDebugEnabled()) { + getLogger().debug("Unloading sessions to " + store.getAbsolutePath()); + } + FileOutputStream fos = null; + BufferedOutputStream bos = null; + ObjectOutputStream oos = null; + boolean error = false; + try { + fos = getFileOutputStream(store); + bos = getBufferedOutputStream(fos); + oos = getObjectOutputStream(bos); + } catch (IOException e) { + error = true; + getLogger().error("Exception unloading sessions", e); + throw e; + } finally { + if (error) { + if (oos != null) { + try { + oos.close(); + } catch (IOException ioe) { + // Ignore + } + } + if (bos != null) { + try { + bos.close(); + } catch (IOException ioe) { + // Ignore + } + } + if (fos != null) { + try { + fos.close(); + } catch (IOException ioe) { + // Ignore + } + } + } + } + + ArrayList<DeltaSessionInterface> list = new ArrayList<>(); + @SuppressWarnings("unchecked") + Iterator<String> elements = (Iterator<String>) results.iterator(); + while (elements.hasNext()) { + String id = elements.next(); + DeltaSessionInterface session = (DeltaSessionInterface) findSession(id); + if (session != null) { + list.add(session); + } + } + + // Write the number of active sessions, followed by the details + if (getLogger().isDebugEnabled()) + getLogger().debug("Unloading " + list.size() + " sessions"); + try { + writeToObjectOutputStream(oos, list); + for (DeltaSessionInterface session : list) { + if (session instanceof StandardSession) { + StandardSession standardSession = (StandardSession) session; + standardSession.passivate(); + standardSession.writeObjectData(oos); + } else { + // All DeltaSessionInterfaces as of Geode 1.0 should be based on StandardSession + throw new IOException("Session should be of type StandardSession"); + } + } + } catch (IOException e) { + getLogger().error("Exception unloading sessions", e); + try { + oos.close(); + } catch (IOException f) { + // Ignore + } + throw e; + } + + // Flush and close the output stream + try { + oos.flush(); + } finally { + try { + oos.close(); + } catch (IOException f) { + // Ignore + } + } + + // Locally destroy the sessions we just wrote if (getSessionCache().isClientServer()) { - if (debugEnabled) { - logger.debug("Locally clearing sessions."); + for (DeltaSessionInterface session : list) { + if (getLogger().isDebugEnabled()) { + getLogger().debug("Locally destroying session " + session.getId()); + } + try { + getSessionCache().getOperatingRegion().localDestroy(session.getId()); + } catch (EntryNotFoundException ex) { + // This can be thrown if an entry is evicted during or immediately after a session is + // written + // to disk. This isn't a problem, but the resulting exception messages can be confusing in + // testing + } + } + } + + if (getLogger().isDebugEnabled()) { + getLogger().debug("Unloading complete"); + } + } + + /** + * Load any currently active sessions that were previously unloaded to the appropriate persistence + * mechanism, if any. If persistence is not supported, this method returns without doing anything. + * + * @throws ClassNotFoundException if a serialized class cannot be found during the reload + * @throws IOException if an input/output error occurs + */ + private void doLoad() throws ClassNotFoundException, IOException { + Context context = getTheContext(); + if (context == null) { + return; + } + + // Open an input stream to the specified pathname, if any + File store = sessionStore(context.getPath()); + if (store == null) { + getLogger().debug("No session store file found"); + return; + } + if (getLogger().isDebugEnabled()) { + getLogger().debug("Loading sessions from " + store.getAbsolutePath()); + } + FileInputStream fis = null; + BufferedInputStream bis = null; + ObjectInputStream ois; + Loader loader = null; + ClassLoader classLoader = null; + try { + fis = getFileInputStream(store); + bis = getBufferedInputStream(fis); + if (getTheContext() != null) { + loader = getTheContext().getLoader(); + } + if (loader != null) { + classLoader = loader.getClassLoader(); + } + if (classLoader != null) { + if (getLogger().isDebugEnabled()) { + getLogger().debug("Creating custom object input stream for class loader"); + } + ois = new CustomObjectInputStream(bis, classLoader); + } else { + if (getLogger().isDebugEnabled()) { + getLogger().debug("Creating standard object input stream"); + } + ois = getObjectInputStream(bis); + } + } catch (FileNotFoundException e) { + if (getLogger().isDebugEnabled()) { + getLogger().debug("No persisted data file found"); } - getSessionCache().getOperatingRegion().localClear(); + return; + } catch (IOException e) { + getLogger().error("Exception loading sessions", e); + try { + fis.close(); + } catch (IOException f) { + // Ignore + } + try { + bis.close(); + } catch (IOException f) { + // Ignore + } + throw e; } - if (debugEnabled) { - logger.debug("Unloading complete"); + // Load the previously unloaded active sessions + try { + int n = getSessionCountFromObjectInputStream(ois); + if (getLogger().isDebugEnabled()) { + getLogger().debug("Loading " + n + " persisted sessions"); + } + for (int i = 0; i < n; i++) { + StandardSession session = getNewSession(); + session.readObjectData(ois); + session.setManager(this); + + Region region = getSessionCache().getOperatingRegion(); + DeltaSessionInterface existingSession = (DeltaSessionInterface) region.get(session.getId()); + // Check whether the existing session is newer + if (existingSession != null + && existingSession.getLastAccessedTime() > session.getLastAccessedTime()) { + if (getLogger().isDebugEnabled()) { + getLogger().debug("Loaded session " + session.getId() + " is older than cached copy"); + } + continue; + } + + // Check whether the new session has already expired + if (!session.isValid()) { + if (getLogger().isDebugEnabled()) { + getLogger().debug("Loaded session " + session.getId() + " is invalid"); + } + continue; + } + + getLogger().debug("Loading session " + session.getId()); + session.activate(); + add(session); + } + } catch (ClassNotFoundException | IOException e) { + getLogger().error(e); + try { + ois.close(); + } catch (IOException f) { + // Ignore + } + throw e; + } finally { + // Close the input stream + try { + ois.close(); + } catch (IOException f) { + // ignored + } + + // Delete the persistent storage file + if (store.exists()) { + if (!store.delete()) { + getLogger().warn("Couldn't delete persistent storage file " + store.getAbsolutePath()); + } + } + } + } + + /** + * Return a File object representing the pathname to our persistence file, if any. + */ + private File sessionStore(String ctxPath) { + String storeDir = getSystemPropertyValue(catalinaBaseSystemProperty); + if (storeDir == null || storeDir.isEmpty()) { + storeDir = getSystemPropertyValue(javaTempDirSystemProperty); + } else { + storeDir += getSystemPropertyValue(fileSeparatorSystemProperty) + "temp"; } + + return getFileAtPath(storeDir, ctxPath); + } + + String getSystemPropertyValue(String propertyKey) { + return System.getProperty(propertyKey); + } + + File getFileAtPath(String storeDir, String ctxPath) { + return (new File(storeDir, ctxPath.replaceAll("/", "_") + ".sessions.ser")); + } + + FileInputStream getFileInputStream(File file) throws FileNotFoundException { + return new FileInputStream(file.getAbsolutePath()); + } + + BufferedInputStream getBufferedInputStream(FileInputStream fis) { + return new BufferedInputStream(fis); + } + + ObjectInputStream getObjectInputStream(BufferedInputStream bis) throws IOException { + return new ObjectInputStream(bis); + } + + FileOutputStream getFileOutputStream(File file) throws FileNotFoundException { + return new FileOutputStream(file.getAbsolutePath()); + } + + BufferedOutputStream getBufferedOutputStream(FileOutputStream fos) { + return new BufferedOutputStream(fos); + } + + ObjectOutputStream getObjectOutputStream(BufferedOutputStream bos) throws IOException { + return new ObjectOutputStream(bos); + } + + void writeToObjectOutputStream(ObjectOutputStream oos, List listToWrite) throws IOException { + oos.writeObject(listToWrite.size()); + } + + int getSessionCountFromObjectInputStream(ObjectInputStream ois) + throws IOException, ClassNotFoundException { + return (Integer) ois.readObject(); } @Override