Author: cziegeler
Date: Wed Jan  5 09:25:28 2011
New Revision: 1055369

URL: http://svn.apache.org/viewvc?rev=1055369&view=rev
Log:
SLING-1917 : Make Locking Strategy Configurable (for Cluster Usage)

Added:
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/LockManager.java
   (with props)
Modified:
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
    
sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
    
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/AbstractJobEventHandlerTest.java

Added: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/LockManager.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/LockManager.java?rev=1055369&view=auto
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/LockManager.java
 (added)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/LockManager.java
 Wed Jan  5 09:25:28 2011
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.jcr;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import javax.jcr.Node;
+import javax.jcr.NodeIterator;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.observation.Event;
+import javax.jcr.observation.EventIterator;
+import javax.jcr.observation.EventListener;
+import javax.jcr.query.Query;
+import javax.jcr.query.QueryManager;
+import javax.jcr.query.QueryResult;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.PropertyOption;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.felix.scr.annotations.Services;
+import org.apache.sling.commons.osgi.OsgiUtil;
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.support.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+...@component(metatype=true,label="%lm.name",description="%jlm.description")
+...@services({
+    @Service(value=Runnable.class),
+    @Service(value=LockManager.class)
+})
+...@properties({
+    @Property(name="scheduler.period", longValue=60, propertyPrivate=true),
+    @Property(name="scheduler.concurrent", boolValue=false, 
propertyPrivate=true)
+})
+/**
+ * The lock manager handles locking and unlocking nodes.
+ * It can be configured to handle locks in different ways:
+ */
+public class LockManager implements Runnable, EventListener {
+
+    /** Default repository path. */
+    private static final String DEFAULT_REPOSITORY_PATH = 
"/var/eventing/cluster";
+
+    /** Modes */
+    private static final String MODE_SESSION = "session";
+    private static final String MODE_OPEN = "open";
+    private static final String MODE_NONE = "none";
+
+    /** Default lock mode. */
+    private static final String DEFAULT_MODE = MODE_SESSION;
+
+    /** Property to be updated by the heartbeat. */
+    private static final String LAST_MODIFIED_PROP = "lastModified";
+
+    /** Nodetype for heartbeat nodes. */
+    private static final String NODE_TYPE = "nt:unstructured";
+
+    /** Lock info prefix. */
+    private static final String OWNER_PREFIX = "SlingVersioningManager:";
+
+    /** The path where all beats are stored. */
+    @Property(value=DEFAULT_REPOSITORY_PATH, propertyPrivate=true)
+    private static final String CONFIG_PROPERTY_REPOSITORY_PATH = 
"repository.path";
+
+    /** Lock mode. */
+    @Property(value=DEFAULT_MODE,
+            option...@propertyoption(name=MODE_SESSION,value="Session Scoped"),
+                     @PropertyOption(name=MODE_OPEN,value="Open Scoped"),
+                     @PropertyOption(name=MODE_NONE,value="None")})
+    private static final String CONFIG_PROPERTY_MODE = "lm.mode";
+
+    private static enum LockMode {
+        session,
+        open,
+        none
+    };
+
+    /** Default logger. */
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    /** Last modified map. */
+    private final Map<String, Long> lastModifiedMap = new HashMap<String, 
Long>();
+
+    /** Flag, indicating that this service is running. */
+    private boolean running;
+
+    /** Lock for the background session. */
+    private final Object backgroundLock = new Object();
+
+    /** Background session. */
+    private Session backgroundSession;
+
+    @Reference
+    private EnvironmentComponent environment;
+
+    /** The repository path. */
+    private String repositoryPath;
+
+    /** The id node path. */
+    private String idNodePath;
+
+    /** Lock mode .*/
+    private LockMode mode;
+
+    /**
+     * Activate this component.
+     * @param props The configuration properties.
+     */
+    @Activate
+    protected void activate(final Map<String, Object> props) throws 
RepositoryException {
+        this.repositoryPath = 
OsgiUtil.toString(props.get(CONFIG_PROPERTY_REPOSITORY_PATH), 
DEFAULT_REPOSITORY_PATH);
+        this.idNodePath = repositoryPath + '/' + Environment.APPLICATION_ID;
+
+        // create the background session and register a listener
+        this.backgroundSession = this.environment.createAdminSession();
+        this.updateLastModified();
+        
this.backgroundSession.getWorkspace().getObservationManager().addEventListener(this,
+                javax.jcr.observation.Event.PROPERTY_CHANGED
+                |javax.jcr.observation.Event.NODE_ADDED,
+                this.repositoryPath,
+                true,
+                null,
+                null,
+                true);
+        logger.info("Apache Sling Versioning Manager started on instance {}", 
Environment.APPLICATION_ID);
+        synchronized ( this.backgroundSession ) {
+            this.unlock(Environment.APPLICATION_ID);
+        }
+        this.scanExistingNodes();
+
+        this.update(props);
+    }
+
+    /**
+     * Deactivate this component.
+     */
+    @Deactivate
+    protected void deactivate() {
+        this.running = false;
+        if ( this.backgroundSession != null ) {
+            synchronized ( this.backgroundLock ) {
+                this.logger.debug("Shutting down background session.");
+                try {
+                    
this.backgroundSession.getWorkspace().getObservationManager().removeEventListener(this);
+                } catch (RepositoryException e) {
+                    // we just ignore it
+                    this.logger.warn("Unable to remove event listener.", e);
+                }
+                this.backgroundSession.logout();
+                this.backgroundSession = null;
+            }
+        }
+        logger.info("Apache Sling Versioning Manager stopped on instance {}", 
Environment.APPLICATION_ID);
+    }
+
+    @Modified
+    protected void update(final Map<String, Object> props) {
+        final LockMode oldMode = this.mode;
+        final String modeString = 
OsgiUtil.toString(props.get(CONFIG_PROPERTY_MODE), DEFAULT_MODE);
+        this.mode = LockMode.valueOf(modeString);
+        if ( oldMode != this.mode ) {
+            this.running = this.mode == LockMode.open;
+        }
+    }
+
+    /**
+     * Creates or gets the {...@link javax.jcr.Node Node} at the given Path.
+     * In case it has to create the Node all non-existent intermediate 
path-elements
+     * will be create with the given intermediate node type and the returned 
node
+     * will be created with the given nodeType
+     *
+     * @param relativePath to create
+     * @return the Node at path
+     * @throws RepositoryException in case of exception accessing the 
Repository
+     */
+    private Node createPath(String relativePath)
+    throws RepositoryException {
+        final Node parentNode = this.backgroundSession.getRootNode();
+        if (!parentNode.hasNode(relativePath)) {
+            Node node = parentNode;
+            int pos = relativePath.lastIndexOf('/');
+            if ( pos != -1 ) {
+                final StringTokenizer st = new 
StringTokenizer(relativePath.substring(0, pos), "/");
+                while ( st.hasMoreTokens() ) {
+                    final String token = st.nextToken();
+                    if ( !node.hasNode(token) ) {
+                        try {
+                            node.addNode(token, NODE_TYPE);
+                            node.getSession().save();
+                        } catch (RepositoryException re) {
+                            // we ignore this as this folder might be created 
from a different task
+                            node.refresh(false);
+                        }
+                    }
+                    node = node.getNode(token);
+                }
+                relativePath = relativePath.substring(pos + 1);
+            }
+            if ( !node.hasNode(relativePath) ) {
+                node.addNode(relativePath, NODE_TYPE);
+            }
+            return node.getNode(relativePath);
+        }
+        return parentNode.getNode(relativePath);
+    }
+
+    /**
+     * Update the last modified of this node
+     */
+    private void updateLastModified() {
+        synchronized ( this.backgroundLock ) {
+            try {
+                final Node slingNode = 
this.createPath(this.idNodePath.substring(1));
+                slingNode.setProperty(LAST_MODIFIED_PROP, 
System.currentTimeMillis());
+                this.backgroundSession.save();
+                logger.debug("Heartbeat at {}", Environment.APPLICATION_ID);
+            } catch (final RepositoryException re) {
+                this.ignoreException(re);
+            }
+        }
+    }
+
+    /** Scan for existing ids */
+    private void scanExistingNodes() {
+        synchronized ( this.backgroundLock ) {
+            try {
+                final Node rootNode = 
this.backgroundSession.getNode(this.repositoryPath);
+                final NodeIterator nI = rootNode.getNodes();
+                while ( nI.hasNext() ) {
+                    final Node node = nI.nextNode();
+                    final String id = node.getName();
+                    if ( !Environment.APPLICATION_ID.equals(id) && 
node.hasProperty(LAST_MODIFIED_PROP) ) {
+                        final javax.jcr.Property prop = 
node.getProperty(LAST_MODIFIED_PROP);
+                        logger.debug("Updated heartbeat from {}", id);
+                        this.lastModifiedMap.put(id, prop.getLong());
+                    }
+                }
+            } catch (final RepositoryException re) {
+                this.ignoreException(re);
+            }
+        }
+    }
+
+    /**
+     * Helper method which just logs the exception in debug mode.
+     * @param e Exception to ignore
+     */
+    private void ignoreException(final Exception e) {
+        if ( this.logger.isDebugEnabled() ) {
+            this.logger.debug("Ignored exception " + e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Cron job
+     * @see java.lang.Runnable#run()
+     */
+    public void run() {
+        if ( this.running ) {
+            // we update last modified
+            this.updateLastModified();
+            final long teeMinusTwo = System.currentTimeMillis() - 120000;
+            synchronized ( this.backgroundLock ) {
+                for(final Map.Entry<String, Long> entry : 
this.lastModifiedMap.entrySet() ) {
+                    if ( entry.getValue() != -1 ) {
+                        logger.debug("Checking cluster node {}", 
entry.getKey());
+                        if ( entry.getValue() <= teeMinusTwo ) {
+                            this.unlock(entry.getKey());
+                            entry.setValue(-1L);
+                        }
+                    }
+                }
+            }
+        }
+
+    }
+
+    /**
+     * Search all locked nodes with this id
+     * @param id The sling id
+     */
+    private void unlock(final String id) {
+        logger.info("Trying to unlock {}", id);
+        try {
+            final String searchString = OWNER_PREFIX + id;
+
+            final QueryManager qm = 
this.backgroundSession.getWorkspace().getQueryManager();
+            final Query q = qm.createQuery("select * from [nt:base] where [" + 
JCRHelper.NODE_PROPERTY_LOCK_OWNER + "] = '" + searchString + "'",
+                    Query.JCR_SQL2);
+            final QueryResult qr = q.execute();
+            final NodeIterator nI = qr.getNodes();
+            while ( nI.hasNext() ) {
+                final Node node = nI.nextNode();
+                try {
+                    if ( node.hasProperty(JCRHelper.NODE_PROPERTY_LOCK_OWNER) 
) {
+                        if ( node.isLocked()
+                             && 
node.getProperty(JCRHelper.NODE_PROPERTY_LOCK_OWNER).getString().endsWith(searchString)
 ) {
+                            logger.debug("Trying to unlock node {} from {}", 
node.getPath(), id);
+                            
this.backgroundSession.getWorkspace().getLockManager().unlock(node.getPath());
+                        }
+                    }
+                } catch (final RepositoryException re) {
+                    this.ignoreException(re);
+                }
+            }
+        } catch (final RepositoryException re) {
+            this.ignoreException(re);
+        }
+    }
+
+    /**
+     * @see 
javax.jcr.observation.EventListener#onEvent(javax.jcr.observation.EventIterator)
+     */
+    public void onEvent(final EventIterator events) {
+        synchronized ( this.backgroundLock ) {
+            while ( events.hasNext() ) {
+                final Event event = events.nextEvent();
+                if ( this.running ) {
+                    try {
+                        final String path = event.getType() == 
javax.jcr.observation.Event.NODE_ADDED
+                                       ? event.getPath() + '/' + 
LAST_MODIFIED_PROP : event.getPath();
+                        if ( this.backgroundSession.propertyExists(path) ) {
+                            final javax.jcr.Property prop = 
this.backgroundSession.getProperty(path);
+                            final String id = prop.getParent().getName();
+                            logger.debug("Updated heartbeat from {}", id);
+                            this.lastModifiedMap.put(id, prop.getLong());
+                        }
+                    } catch (final RepositoryException re) {
+                        this.ignoreException(re);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Lock the node at the given path
+     * @param session The session to create the lock with
+     * @param path The path to the node to lock
+     * @throws RepositoryException If anything goes wrong
+     */
+    public void lock(final Session session, final String path) throws 
RepositoryException {
+        if ( this.mode != LockMode.none ) {
+            session.getWorkspace().getLockManager().lock(path, false,
+                    this.mode == LockMode.session, Long.MAX_VALUE,
+                    OWNER_PREFIX + Environment.APPLICATION_ID);
+        }
+    }
+
+    /**
+     * Unlock the node at the given path.
+     * @param session The session for unlocking
+     * @param path The path to the node to unlock
+     * @throws RepositoryException If anything goes wrong
+     */
+    public void unlock(final Session session, final String path)
+    throws RepositoryException {
+        if ( this.mode != LockMode.none ) {
+            session.getWorkspace().getLockManager().unlock(path);
+        }
+    }
+}

Propchange: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/LockManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/LockManager.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/LockManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java?rev=1055369&r1=1055368&r2=1055369&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
 Wed Jan  5 09:25:28 2011
@@ -81,7 +81,9 @@ import org.slf4j.LoggerFactory;
             value={"org/osgi/framework/BundleEvent/UPDATED",
                    "org/osgi/framework/BundleEvent/STARTED",
                    JobUtil.TOPIC_JOB}),
-     @Property(name="scheduler.period", 
longValue=300,label="%persscheduler.period.name",description="%persscheduler.period.description"),
+     @Property(name="scheduler.period", longValue=300,
+               label="%persscheduler.period.name",
+               description="%persscheduler.period.description"),
      @Property(name="scheduler.concurrent", boolValue=false, 
propertyPrivate=true)
 })
 public class PersistenceHandler implements EventListener, Runnable, 
EventHandler {
@@ -96,7 +98,9 @@ public class PersistenceHandler implemen
     /** Default clean up time is 5 minutes. */
     private static final int DEFAULT_CLEANUP_PERIOD = 5;
 
-    
@Property(intValue=DEFAULT_CLEANUP_PERIOD,label="%jobcleanup.period.name",description="%jobcleanup.period.description")
+    @Property(intValue=DEFAULT_CLEANUP_PERIOD,
+              label="%jobcleanup.period.name",
+              description="%jobcleanup.period.description")
     private static final String CONFIG_PROPERTY_CLEANUP_PERIOD = 
"cleanup.period";
 
     /** Default maximum load jobs. */
@@ -157,6 +161,9 @@ public class PersistenceHandler implemen
     @Reference
     private JobManager jobManager;
 
+    @Reference
+    private LockManager lockManager;
+
     /**
      * Activate this component.
      * @param context The component context.
@@ -900,7 +907,7 @@ public class PersistenceHandler implemen
                     if ( !eventNode.isLocked() ) {
                         // lock node
                         try {
-                            
this.backgroundSession.getWorkspace().getLockManager().lock(path, false, true, 
Long.MAX_VALUE, "JobEventHandler:" + Environment.APPLICATION_ID);
+                            this.lockManager.lock(this.backgroundSession, 
path);
                         } catch (RepositoryException re) {
                             // lock failed which means that the node is locked 
by someone else, so we don't have to requeue
                             return false;
@@ -945,7 +952,7 @@ public class PersistenceHandler implemen
                 return;
             }
             try {
-                
this.backgroundSession.getWorkspace().getLockManager().unlock(path);
+                this.lockManager.unlock(this.backgroundSession, path);
             } catch (RepositoryException re) {
                 // there is nothing we can do
                 this.ignoreException(re);
@@ -964,8 +971,8 @@ public class PersistenceHandler implemen
                 return;
             }
             try {
+                
((DefaultJobManager)this.jobManager).notifyRemoveJob(info.uniqueId);
                 if ( this.backgroundSession.itemExists(path) ) {
-                    
((DefaultJobManager)this.jobManager).notifyRemoveJob(info.uniqueId);
                     final Node eventNode = 
(Node)this.backgroundSession.getItem(path);
                     if ( jobId == null ) {
                         // simply remove the node
@@ -977,7 +984,7 @@ public class PersistenceHandler implemen
                     this.backgroundSession.save();
                     // and unlock
                     if ( jobId != null && eventNode.isLocked() ) {
-                        
this.backgroundSession.getWorkspace().getLockManager().unlock(path);
+                        this.lockManager.unlock(this.backgroundSession, path);
                     }
                 }
             } catch (RepositoryException re) {
@@ -1043,7 +1050,7 @@ public class PersistenceHandler implemen
                     this.backgroundSession.save();
 
                     // and unlock
-                    
this.backgroundSession.getWorkspace().getLockManager().unlock(path);
+                    this.lockManager.unlock(this.backgroundSession, path);
                     return true;
                 }
             } catch (RepositoryException re) {
@@ -1051,6 +1058,7 @@ public class PersistenceHandler implemen
                 this.ignoreException(re);
             }
         }
+        ((DefaultJobManager)this.jobManager).notifyRemoveJob(info.uniqueId);
         return false;
     }
 

Modified: 
sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=1055369&r1=1055368&r2=1055369&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
 Wed Jan  5 09:25:28 2011
@@ -162,3 +162,18 @@ minPoolSize.description = The size of th
 
 priority.name = Priority
 priority.description = The priority for the threads from this pool. Default is 
norm.
+
+#
+# Lock Manager
+lm.name = Apache Sling Event Lock Manager
+lm.description = This service is responsible for locking and unlock the event 
nodes. Dependening \
+ on the environment, special configuration can improve the performance.
+ 
+lm.mode.name = Lock Mode
+lm.mode.description = The lock mode defines how the events are locked in the 
repository. The default \
+ is to use session scoped locks. With session scoped locks it's the task of 
the repository to propagate \
+ unlocks in a cluster if a session/cluster node dies. When open scoped locks 
are used, the lock manager \
+ takes care to propagate this information. Please note, that Apache Jackrabbit 
currently does not support \
+ session scoped locks in a cluster and the security is too strong when it 
comes to open scoped locks. \
+ The setting none should only be used, if no cluster is used or if by other 
means it is guaranteed that \
+ only a single node in the cluster is processing jobs.

Modified: 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/AbstractJobEventHandlerTest.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/AbstractJobEventHandlerTest.java?rev=1055369&r1=1055368&r2=1055369&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/AbstractJobEventHandlerTest.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/AbstractJobEventHandlerTest.java
 Wed Jan  5 09:25:28 2011
@@ -25,6 +25,7 @@ import junitx.util.PrivateAccessor;
 import org.apache.sling.event.impl.AbstractTest;
 import org.apache.sling.event.impl.SimpleScheduler;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.jcr.LockManager;
 import org.apache.sling.event.impl.jobs.jcr.PersistenceHandler;
 import org.jmock.Expectations;
 import org.jmock.integration.junit4.JMock;
@@ -57,6 +58,7 @@ public abstract class AbstractJobEventHa
         this.handler = new PersistenceHandler();
         PrivateAccessor.setField(this.handler, "environment", 
this.environment);
         PrivateAccessor.setField(this.handler, "jobManager", this.jobManager);
+        PrivateAccessor.setField(this.handler, "lockManager", new 
LockManager());
 
         // lets set up the bundle context
         final BundleContext bundleContext = 
this.getMockery().mock(BundleContext.class, "beforeBundleContext" + 
activateCount);


Reply via email to