Author: veithen
Date: Sun May 10 12:06:54 2009
New Revision: 773328
URL: http://svn.apache.org/viewvc?rev=773328&view=rev
Log:
WSCOMMONS-466: Implemented a more robust mechanism to track Axis services in
AbstractTransportListener.
Added:
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/tracker/
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/tracker/AxisServiceFilter.java
(with props)
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/tracker/AxisServiceTracker.java
(with props)
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/tracker/AxisServiceTrackerListener.java
(with props)
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/tracker/package-info.java
(with props)
Modified:
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java
Modified:
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java
URL:
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java?rev=773328&r1=773327&r2=773328&view=diff
==============================================================================
---
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java
(original)
+++
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java
Sun May 10 12:06:54 2009
@@ -26,16 +26,15 @@
import org.apache.axis2.util.MessageContextBuilder;
import org.apache.axis2.transport.base.threads.WorkerPool;
import org.apache.axis2.transport.base.threads.WorkerPoolFactory;
+import org.apache.axis2.transport.base.tracker.AxisServiceFilter;
+import org.apache.axis2.transport.base.tracker.AxisServiceTracker;
+import org.apache.axis2.transport.base.tracker.AxisServiceTrackerListener;
import org.apache.axis2.transport.TransportListener;
import org.apache.axis2.engine.AxisEngine;
-import org.apache.axis2.engine.AxisObserver;
-import org.apache.axis2.engine.AxisConfiguration;
-import org.apache.axis2.engine.AxisEvent;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.axiom.om.util.UUIDGenerator;
-import org.apache.axiom.om.OMElement;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -58,8 +57,10 @@
protected int state = BaseConstants.STOPPED;
/** is this transport non-blocking? */
protected boolean isNonBlocking = false;
- /** the axis observer that gets notified of service life cycle events*/
- private final AxisObserver axisObserver = new GenericAxisObserver();
+ /**
+ * Service tracker used to invoke {...@link
#internalStartListeningForService(AxisService)}
+ * and {...@link #internalStopListeningForService(AxisService)}. */
+ private AxisServiceTracker serviceTracker;
/** the thread pool to execute actual poll invocations */
protected WorkerPool workerPool = null;
@@ -101,7 +102,23 @@
}
// register to receive updates on services for lifetime management
- cfgCtx.getAxisConfiguration().addObservers(axisObserver);
+ serviceTracker = new AxisServiceTracker(
+ cfgCtx.getAxisConfiguration(),
+ new AxisServiceFilter() {
+ public boolean matches(AxisService service) {
+ return !service.getName().startsWith("__") // these
are "private" services
+ && BaseUtils.isUsingTransport(service,
getTransportName());
+ }
+ },
+ new AxisServiceTrackerListener() {
+ public void serviceAdded(AxisService service) {
+ internalStartListeningForService(service);
+ }
+
+ public void serviceRemoved(AxisService service) {
+ internalStopListeningForService(service);
+ }
+ });
// register with JMX
mbeanSupport = new TransportMBeanSupport(this, getTransportName());
@@ -132,11 +149,8 @@
if (state == BaseConstants.STARTED) {
state = BaseConstants.STOPPED;
// cancel receipt of service lifecycle events
-
cfgCtx.getAxisConfiguration().getObserversList().remove(axisObserver);
log.info(getTransportName().toUpperCase() + " Listener Shutdown");
- for (AxisService service : getListeningServices()) {
- internalStopListeningForService(service);
- }
+ serviceTracker.stop();
}
}
@@ -147,31 +161,10 @@
// cfgCtx.getAxisConfiguration().addObservers(axisObserver);
log.info(getTransportName().toUpperCase() + " Listener started");
// iterate through deployed services and start
- for (AxisService service : getListeningServices()) {
- internalStartListeningForService(service);
- }
+ serviceTracker.start();
}
}
- /**
- * Get the list of services that are listening on this transport, i.e.
that are
- * configured to use this transport.
- *
- * @return the list of listening services
- */
- private List<AxisService> getListeningServices() {
- List<AxisService> result = new LinkedList<AxisService>();
- Iterator services =
cfgCtx.getAxisConfiguration().getServices().values().iterator();
- while (services.hasNext()) {
- AxisService service = (AxisService) services.next();
- if (!ignoreService(service)
- && BaseUtils.isUsingTransport(service,
getTransportName())) {
- result.add(service);
- }
- }
- return result;
- }
-
public EndpointReference[] getEPRsForService(String serviceName, String
ip) throws AxisFault {
return getEPRsForService(serviceName);
}
@@ -180,10 +173,6 @@
return null;
}
- private boolean ignoreService(AxisService service) {
- return service.getName().startsWith("__"); // these are "private"
services
- }
-
public void disableTransportForService(AxisService service) {
log.warn("Disabling the " + getTransportName() + " transport for the
service "
@@ -204,7 +193,7 @@
}
}
- private void internalStartListeningForService(AxisService service) {
+ void internalStartListeningForService(AxisService service) {
String serviceName = service.getName();
try {
startListeningForService(service);
@@ -237,7 +226,7 @@
getEndpointMBeanName(serviceName));
}
- private void internalStopListeningForService(AxisService service) {
+ void internalStopListeningForService(AxisService service) {
unregisterMBean(getEndpointMBeanName(service.getName()));
stopListeningForService(service);
}
@@ -352,47 +341,6 @@
return metrics;
}
- /**
- * An AxisObserver which will start listening for newly deployed or
started services,
- * and stop listening when services are undeployed or stopped.
- */
- class GenericAxisObserver implements AxisObserver {
-
- // The initilization code will go here
- public void init(AxisConfiguration axisConfig) {
- }
-
- public void serviceUpdate(AxisEvent event, AxisService service) {
-
- if (!ignoreService(service)
- && BaseUtils.isUsingTransport(service,
getTransportName())) {
- switch (event.getEventType()) {
- case AxisEvent.SERVICE_DEPLOY :
- internalStartListeningForService(service);
- break;
- case AxisEvent.SERVICE_REMOVE :
- internalStopListeningForService(service);
- break;
- case AxisEvent.SERVICE_START :
- internalStartListeningForService(service);
- break;
- case AxisEvent.SERVICE_STOP :
- internalStopListeningForService(service);
- break;
- }
- }
- }
-
- public void moduleUpdate(AxisEvent event, AxisModule module) {}
- public void addParameter(Parameter param) throws AxisFault {}
- public void removeParameter(Parameter param) throws AxisFault {}
- public void deserializeParameters(OMElement parameterElement) throws
AxisFault {}
- public Parameter getParameter(String name) { return null; }
- public ArrayList getParameters() { return null; }
- public boolean isParameterLocked(String parameterName) { return false;
}
- public void serviceGroupUpdate(AxisEvent event, AxisServiceGroup
serviceGroup) {}
- }
-
// -- jmx/management methods--
/**
* Pause the listener - Stop accepting/processing new messages, but
continues processing existing
Added:
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/tracker/AxisServiceFilter.java
URL:
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/tracker/AxisServiceFilter.java?rev=773328&view=auto
==============================================================================
---
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/tracker/AxisServiceFilter.java
(added)
+++
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/tracker/AxisServiceFilter.java
Sun May 10 12:06:54 2009
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.axis2.transport.base.tracker;
+
+import org.apache.axis2.description.AxisService;
+
+/**
+ * Filter for {...@link AxisService} instances. This interface is used by
+ * {...@link AxisServiceTracker}.
+ */
+public interface AxisServiceFilter {
+ /**
+ * Examine whether a given service matches the filter criteria.
+ *
+ * @param service the service to examine
+ * @return <code>true</code> if the service matches the filter criteria
+ */
+ boolean matches(AxisService service);
+}
Propchange:
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/tracker/AxisServiceFilter.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/tracker/AxisServiceTracker.java
URL:
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/tracker/AxisServiceTracker.java?rev=773328&view=auto
==============================================================================
---
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/tracker/AxisServiceTracker.java
(added)
+++
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/tracker/AxisServiceTracker.java
Sun May 10 12:06:54 2009
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.axis2.transport.base.tracker;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.description.AxisModule;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.AxisServiceGroup;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.engine.AxisConfiguration;
+import org.apache.axis2.engine.AxisEvent;
+import org.apache.axis2.engine.AxisObserver;
+
+/**
+ * <p>Tracks services deployed in a given {...@link AxisConfiguration}.
+ * The tracker is configured with references to three objects:</p>
+ * <ol>
+ * <li>An {...@link AxisConfiguration} to watch.</li>
+ * <li>An {...@link AxisServiceFilter} restricting the services to
track.</li>
+ * <li>An {...@link AxisServiceTrackerListener} receiving tracking
events.</li>
+ * </ol>
+ * <p>An instance of this class maintains an up-to-date list of services
+ * satisfying all of the following criteria:</p>
+ * <ol>
+ * <li>The service is deployed in the given {...@link
AxisConfiguration}.</li>
+ * <li>The service is started, i.e. {...@link AxisService#isActive()}
returns true.</li>
+ * <li>The service matches the criteria specified by the given
+ * {...@link AxisServiceFilter} instance.</li>
+ * </ol>
+ * <p>Whenever a service appears on the list, the tracker will call
+ * {...@link AxisServiceTrackerListener#serviceAdded(AxisService)}. When a
service disappears, it
+ * will call {...@link
AxisServiceTrackerListener#serviceRemoved(AxisService)}.</p>
+ * <p>When the tracker is created, it is initially in the stopped state. In
this state no
+ * events will be sent to the listener. It can be started using {...@link
#start()} and stopped again
+ * using {...@link #stop()}. The tracker list is defined to be empty when the
tracker is in the
+ * stopped state. This implies that a call to {...@link #start()} will generate
+ * {...@link AxisServiceTrackerListener#serviceAdded(AxisService)} events for
all services that meet
+ * the above criteria at that point in time. In the same way, {...@link
#stop()} will generate
+ * {...@link AxisServiceTrackerListener#serviceRemoved(AxisService)} events
for the current entries
+ * in the list.</p>
+ * <p>As a corollary the tracker guarantees that during a complete lifecycle
(start-stop),
+ * there will be exactly one {...@link
AxisServiceTrackerListener#serviceRemoved(AxisService)} event
+ * for every {...@link AxisServiceTrackerListener#serviceAdded(AxisService)}
event and vice-versa.
+ * This property is important when the tracker is used to allocate resources
for a dynamic set
+ * of services.</p>
+ *
+ * <h2>Limitations</h2>
+ *
+ * <p>The tracker is not able to detect property changes on services. E.g. if
a service initially
+ * matches the filter criteria, but later changes so that it doesn't match the
criteria any more,
+ * the tracker will not be able to detect this and the service will not be
removed from the tracker
+ * list.</p>
+ */
+public class AxisServiceTracker {
+ private final AxisObserver observer = new AxisObserver() {
+ public void init(AxisConfiguration axisConfig) {}
+
+ public void serviceUpdate(AxisEvent event, final AxisService service) {
+ switch (event.getEventType()) {
+ case AxisEvent.SERVICE_DEPLOY:
+ case AxisEvent.SERVICE_START:
+ if (filter.matches(service)) {
+ boolean pending;
+ synchronized (lock) {
+ if (pending = (pendingActions != null)) {
+ pendingActions.add(new Runnable() {
+ public void run() {
+ serviceAdded(service);
+ }
+ });
+ }
+ }
+ if (!pending) {
+ serviceAdded(service);
+ }
+ }
+ break;
+ case AxisEvent.SERVICE_REMOVE:
+ case AxisEvent.SERVICE_STOP:
+ // Don't check filter here because the properties of the
service may have
+ // changed in the meantime.
+ boolean pending;
+ synchronized (lock) {
+ if (pending = (pendingActions != null)) {
+ pendingActions.add(new Runnable() {
+ public void run() {
+ serviceRemoved(service);
+ }
+ });
+ }
+ }
+ if (!pending) {
+ serviceRemoved(service);
+ }
+ }
+ }
+
+ public void moduleUpdate(AxisEvent event, AxisModule module) {}
+ public void addParameter(Parameter param) throws AxisFault {}
+ public void removeParameter(Parameter param) throws AxisFault {}
+ public void deserializeParameters(OMElement parameterElement) throws
AxisFault {}
+ public Parameter getParameter(String name) { return null; }
+ public ArrayList<Parameter> getParameters() { return null; }
+ public boolean isParameterLocked(String parameterName) { return false;
}
+ public void serviceGroupUpdate(AxisEvent event, AxisServiceGroup
serviceGroup) {}
+ };
+
+ private final AxisConfiguration config;
+ final AxisServiceFilter filter;
+ private final AxisServiceTrackerListener listener;
+
+ /**
+ * Object used to synchronize access to {...@link #pendingActions} and
{...@link #services}.
+ */
+ final Object lock = new Object();
+
+ /**
+ * Queue for notifications received by the {...@link AxisObserver} during
startup of the tracker.
+ * We need this because the events may already be reflected in the list of
services returned
+ * by {...@link AxisConfiguration#getServices()} (getting the list of
currently deployed services
+ * and adding the observer can't be done atomically). It also allows us to
make sure that
+ * events are sent to the listener in the right order, e.g. when a service
is being removed
+ * during startup of the tracker.
+ */
+ Queue<Runnable> pendingActions;
+
+ /**
+ * The current list of services. <code>null</code> if the tracker is
stopped.
+ */
+ private Set<AxisService> services;
+
+ public AxisServiceTracker(AxisConfiguration config, AxisServiceFilter
filter,
+ AxisServiceTrackerListener listener) {
+ this.config = config;
+ this.filter = filter;
+ this.listener = listener;
+ }
+
+ /**
+ * Check whether the tracker is started.
+ *
+ * @return <code>true</code> if the tracker is started
+ */
+ public boolean isStarted() {
+ return services != null;
+ }
+
+ /**
+ * Start the tracker.
+ *
+ * @throws IllegalStateException if the tracker has already been started
+ */
+ public void start() {
+ if (services != null) {
+ throw new IllegalStateException();
+ }
+ synchronized (lock) {
+ pendingActions = new LinkedList<Runnable>();
+ config.addObservers(observer);
+ services = new HashSet<AxisService>();
+ }
+ for (AxisService service : config.getServices().values()) {
+ if (service.isActive() && filter.matches(service)) {
+ serviceAdded(service);
+ }
+ }
+ while (true) {
+ Runnable action;
+ synchronized (lock) {
+ action = pendingActions.poll();
+ if (action == null) {
+ pendingActions = null;
+ break;
+ }
+ }
+ action.run();
+ }
+ }
+
+ void serviceAdded(AxisService service) {
+ // callListener may be false because the observer got an event for a
service that
+ // was already in the initial list of services retrieved by
AxisConfiguration#getServices.
+ boolean callListener;
+ synchronized (lock) {
+ callListener = services.add(service);
+ }
+ if (callListener) {
+ listener.serviceAdded(service);
+ }
+ }
+
+ void serviceRemoved(AxisService service) {
+ // callListener may be false because the observer invokes this method
without applying the
+ // filter.
+ boolean callListener;
+ synchronized (lock) {
+ callListener = services.remove(service);
+ }
+ if (callListener) {
+ listener.serviceRemoved(service);
+ }
+ }
+
+ /**
+ * Stop the tracker.
+ *
+ * @throws IllegalStateException if the tracker is not started
+ */
+ public void stop() {
+ if (services == null) {
+ throw new IllegalStateException();
+ }
+ // TODO: This is very bad, but AxisConfiguration has no removeObserver
method!
+ config.getObserversList().remove(observer);
+ for (AxisService service : services) {
+ listener.serviceRemoved(service);
+ }
+ services = null;
+ }
+}
Propchange:
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/tracker/AxisServiceTracker.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/tracker/AxisServiceTrackerListener.java
URL:
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/tracker/AxisServiceTrackerListener.java?rev=773328&view=auto
==============================================================================
---
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/tracker/AxisServiceTrackerListener.java
(added)
+++
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/tracker/AxisServiceTrackerListener.java
Sun May 10 12:06:54 2009
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.axis2.transport.base.tracker;
+
+import org.apache.axis2.description.AxisService;
+
+/**
+ * Listener for events generated by an {...@link AxisServiceTracker}.
+ */
+public interface AxisServiceTrackerListener {
+ /**
+ * Inform the listener that a service has been added to tracker list.
+ *
+ * @param service the service that has been added to the tracker list
+ */
+ void serviceAdded(AxisService service);
+
+ /**
+ * Inform the listener that a service has been removed from the tracker
list.
+ *
+ * @param service the service that has been removed from the tracker list
+ */
+ void serviceRemoved(AxisService service);
+}
Propchange:
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/tracker/AxisServiceTrackerListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/tracker/package-info.java
URL:
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/tracker/package-info.java?rev=773328&view=auto
==============================================================================
---
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/tracker/package-info.java
(added)
+++
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/tracker/package-info.java
Sun May 10 12:06:54 2009
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Contains utility classes to track a dynamic set of services deployed in an
+ * Axis configuration.
+ *
+ * @see org.apache.axis2.transport.base.tracker.AxisServiceTracker
+ */
+package org.apache.axis2.transport.base.tracker;
\ No newline at end of file
Propchange:
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/tracker/package-info.java
------------------------------------------------------------------------------
svn:eol-style = native