masonjm 2004/08/09 16:11:35
Modified: src/share/org/apache/slide/cluster
ClusterCacheRefresher.java
Log:
- Readded threading
- Pre-parse uris to match the uri format used as cache keys (remove "/slide",
basically)
Revision Changes Path
1.3 +103 -27
jakarta-slide/src/share/org/apache/slide/cluster/ClusterCacheRefresher.java
Index: ClusterCacheRefresher.java
===================================================================
RCS file:
/home/cvs/jakarta-slide/src/share/org/apache/slide/cluster/ClusterCacheRefresher.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- ClusterCacheRefresher.java 9 Aug 2004 11:51:53 -0000 1.2
+++ ClusterCacheRefresher.java 9 Aug 2004 23:11:34 -0000 1.3
@@ -208,24 +208,34 @@
final int pollInterval = node.getAttributeAsInt("poll-interval", 60000);
final boolean udp = node.getAttributeAsBoolean("udp", true);
final String uri = node.getAttribute("base-uri", "/");
- final int depth = Integer.MAX_VALUE;
+ // TODO: This needs to be infinity. This is waiting on bug #30527
+ final int depth = 1;
final int lifetime = node.getAttributeAsInt("subscription-lifetime",
3600);
final int notificationDelay =
node.getAttributeAsInt("notification-delay", 0);
final Subscriber contentSubscriber = new Subscriber() {
public void notify(String uri, Map information) {
NamespaceAccessToken nat = Domain.accessNamespace(new
SecurityToken(this), Domain.getDefaultNamespace());
- Iterator keys = information.keySet().iterator();
- while (keys.hasNext()) {
- String key = keys.next().toString();
- if ("uri".equals(key)) {
- Uri theUri = nat.getUri(new SlideTokenImpl(new
CredentialsToken("")), information.get(key).toString());
- Store store = theUri.getStore();
- if (store instanceof ExtendedStore) {
- Domain.log("Resetting cache for " + theUri,
LOG_CHANNEL, Logger.INFO);
- ((ExtendedStore)
store).removeObjectFromCache(theUri);
+ try {
+ nat.begin();
+ Iterator keys = information.keySet().iterator();
+ while (keys.hasNext()) {
+ String key = keys.next().toString();
+ if ("uri".equals(key)) {
+ Uri theUri = nat.getUri(new SlideTokenImpl(new
CredentialsToken("")), stripUri(information.get(key).toString()));
+ Store store = theUri.getStore();
+ if (store instanceof ExtendedStore) {
+ Domain.log("Resetting cache for " + theUri,
LOG_CHANNEL, Logger.INFO);
+ ((ExtendedStore)
store).removeObjectFromCache(theUri);
+ }
}
}
+ nat.commit();
+ } catch(Exception e) {
+ if (Domain.isEnabled(LOG_CHANNEL, Logger.ERROR)) {
+ Domain.log("Error clearing cache: " + e + ". See
stderr for stacktrace.", LOG_CHANNEL, Logger.ERROR);
+ e.printStackTrace();
+ }
}
}
};
@@ -233,28 +243,94 @@
final Subscriber structureSubscriber = new Subscriber() {
public void notify(String uri, Map information) {
NamespaceAccessToken nat = Domain.accessNamespace(new
SecurityToken(this), Domain.getDefaultNamespace());
- Iterator keys = information.keySet().iterator();
- while (keys.hasNext()) {
- String key = keys.next().toString();
- if ("uri".equals(key)) {
- Uri theUri = nat.getUri(new SlideTokenImpl(new
CredentialsToken("")), information.get(key).toString());
- Store store = theUri.getParentUri().getStore();
- if (store instanceof ExtendedStore) {
- Domain.log("Resetting cache for " +
theUri.getParentUri(), LOG_CHANNEL, Logger.INFO);
- ((ExtendedStore)
store).removeObjectFromCache(theUri.getParentUri());
- }
- }
+ try {
+ nat.begin();
+ Iterator keys = information.keySet().iterator();
+ while (keys.hasNext()) {
+ String key = keys.next().toString();
+ if ("uri".equals(key)) {
+ Uri theUri = nat.getUri(new SlideTokenImpl(new
CredentialsToken("")), stripUri(information.get(key).toString()));
+ Store store = theUri.getParentUri().getStore();
+ if (store instanceof ExtendedStore) {
+ Domain.log("Resetting cache for " +
theUri.getParentUri(), LOG_CHANNEL, Logger.INFO);
+ ((ExtendedStore)
store).removeObjectFromCache(theUri.getParentUri());
+ }
+ }
+ }
+ nat.commit();
+ } catch(Exception e) {
+ if (Domain.isEnabled(LOG_CHANNEL, Logger.ERROR)) {
+ Domain.log("Error clearing cache: " + e + ". See
stderr for stacktrace.", LOG_CHANNEL, Logger.ERROR);
+ e.printStackTrace();
+ }
}
}
};
- listener = new NotificationListener(host, port, repositoryHost,
repositoryPort, protocol, credentials,
- repositoryDomain, pollInterval, udp);
+ /*
+ * This needs to be done in a thread for three reasons:
+ * 1) If NotificationListener.subscribe() connects to a Slide instance
+ * that is in the process of starting it will wait until the server
+ * has finished starting before it returns. If configuration is
+ * single-thread this prevents this Slide instance from starting
+ * until all other Slide instances have started. This means none
+ * of them can start if they're all waiting for each other. Simple
+ * test case of this is a cluster of one instance. It never starts.
+ * 2) Allows for renewing of subscriptions.
+ * 3) Allows for retrying failed subscriptions. This will happen if
+ * a server is down and NotificationListener.subscribe() can't
+ * reach it.
+ */
+ Thread t = new Thread(new Runnable() {
+
+ private boolean success;
+
+ public void run() {
+ success = true;
+ listener = new NotificationListener(host, port, repositoryHost,
repositoryPort, protocol, credentials,
+ repositoryDomain, pollInterval, udp);
- listener.subscribe("Update", uri, depth, lifetime, notificationDelay,
contentSubscriber, credentials);
- listener.subscribe("Update/newmember", uri, depth, lifetime,
notificationDelay, structureSubscriber, credentials);
- listener.subscribe("Delete", uri, depth, lifetime, notificationDelay,
structureSubscriber, credentials);
- listener.subscribe("Move", uri, depth, lifetime, notificationDelay,
structureSubscriber, credentials);
+ success = listener.subscribe("Update", uri, depth, lifetime,
notificationDelay, contentSubscriber, credentials);
+ success = listener.subscribe("Update/newmember", uri, depth,
lifetime, notificationDelay, structureSubscriber, credentials);
+ success = listener.subscribe("Delete", uri, depth, lifetime,
notificationDelay, structureSubscriber, credentials);
+ success = listener.subscribe("Move", uri, depth, lifetime,
notificationDelay, structureSubscriber, credentials);
+
+ if ( !success ) {
+ // try again quickly
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ } else {
+ // try again before the subscriptions
expire
+ try {
+ Thread.sleep(lifetime*1000-60);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+ });
+ t.setDaemon(true);
+ t.start();
}
+ }
+
+ /**
+ * Removes the first segment of a uri. "/slide/files/foo" becomes
+ * "/files/foo".
+ *
+ * @param uri the uri to strip
+ * @return the stipped uri
+ */
+ private String stripUri(String uri) {
+ if ( uri.indexOf("/") == 0 ) {
+ uri = uri.substring(1);
+ }
+ if ( uri.indexOf("/") > -1 ) {
+ uri = uri.substring(uri.indexOf("/"));
+ }
+ return uri;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]