masonjm     2004/08/09 16:11:35

  Modified:    src/share/org/apache/slide/cluster
                        ClusterCacheRefresher.java
  Log:
  - Readded threading
  - Pre-parse uris to match the uri format used as cache keys (remove "/slide", 
basically)
  
  Revision  Changes    Path
  1.3       +103 -27   
jakarta-slide/src/share/org/apache/slide/cluster/ClusterCacheRefresher.java
  
  Index: ClusterCacheRefresher.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-slide/src/share/org/apache/slide/cluster/ClusterCacheRefresher.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- ClusterCacheRefresher.java        9 Aug 2004 11:51:53 -0000       1.2
  +++ ClusterCacheRefresher.java        9 Aug 2004 23:11:34 -0000       1.3
  @@ -208,24 +208,34 @@
               final int pollInterval = node.getAttributeAsInt("poll-interval", 60000);
               final boolean udp = node.getAttributeAsBoolean("udp", true);
               final String uri = node.getAttribute("base-uri", "/");
  -            final int depth = Integer.MAX_VALUE;
  +            // TODO: This needs to be infinity. This is waiting on bug #30527
  +            final int depth = 1;
               final int lifetime = node.getAttributeAsInt("subscription-lifetime", 
3600);
               final int notificationDelay = 
node.getAttributeAsInt("notification-delay", 0);
   
               final Subscriber contentSubscriber = new Subscriber() {
                   public void notify(String uri, Map information) {
                       NamespaceAccessToken nat = Domain.accessNamespace(new 
SecurityToken(this), Domain.getDefaultNamespace());
  -                    Iterator keys = information.keySet().iterator();
  -                    while (keys.hasNext()) {
  -                        String key = keys.next().toString();
  -                        if ("uri".equals(key)) {
  -                            Uri theUri = nat.getUri(new SlideTokenImpl(new 
CredentialsToken("")), information.get(key).toString());
  -                            Store store = theUri.getStore();
  -                            if (store instanceof ExtendedStore) {
  -                                Domain.log("Resetting cache for " + theUri, 
LOG_CHANNEL, Logger.INFO);
  -                                ((ExtendedStore) 
store).removeObjectFromCache(theUri);
  +                    try {
  +                        nat.begin();
  +                        Iterator keys = information.keySet().iterator();
  +                        while (keys.hasNext()) {
  +                            String key = keys.next().toString();
  +                            if ("uri".equals(key)) {
  +                                Uri theUri = nat.getUri(new SlideTokenImpl(new 
CredentialsToken("")), stripUri(information.get(key).toString()));
  +                                Store store = theUri.getStore();
  +                                if (store instanceof ExtendedStore) {
  +                                    Domain.log("Resetting cache for " + theUri, 
LOG_CHANNEL, Logger.INFO);
  +                                    ((ExtendedStore) 
store).removeObjectFromCache(theUri);
  +                                }
                               }
                           }
  +                        nat.commit();
  +                    } catch(Exception e) {
  +                     if (Domain.isEnabled(LOG_CHANNEL, Logger.ERROR)) {
  +                             Domain.log("Error clearing cache: " + e + ". See 
stderr for stacktrace.", LOG_CHANNEL, Logger.ERROR);
  +                             e.printStackTrace();
  +                     }
                       }
                   }
               };
  @@ -233,28 +243,94 @@
               final Subscriber structureSubscriber = new Subscriber() {
                   public void notify(String uri, Map information) {
                       NamespaceAccessToken nat = Domain.accessNamespace(new 
SecurityToken(this), Domain.getDefaultNamespace());
  -                    Iterator keys = information.keySet().iterator();
  -                    while (keys.hasNext()) {
  -                        String key = keys.next().toString();
  -                        if ("uri".equals(key)) {
  -                            Uri theUri = nat.getUri(new SlideTokenImpl(new 
CredentialsToken("")), information.get(key).toString());
  -                            Store store = theUri.getParentUri().getStore();
  -                            if (store instanceof ExtendedStore) {
  -                                Domain.log("Resetting cache for " + 
theUri.getParentUri(), LOG_CHANNEL, Logger.INFO);
  -                                ((ExtendedStore) 
store).removeObjectFromCache(theUri.getParentUri());
  -                            }
  -                        }
  +                    try {
  +                        nat.begin();
  +                         Iterator keys = information.keySet().iterator();
  +                         while (keys.hasNext()) {
  +                             String key = keys.next().toString();
  +                             if ("uri".equals(key)) {
  +                                 Uri theUri = nat.getUri(new SlideTokenImpl(new 
CredentialsToken("")), stripUri(information.get(key).toString()));
  +                                 Store store = theUri.getParentUri().getStore();
  +                                 if (store instanceof ExtendedStore) {
  +                                     Domain.log("Resetting cache for " + 
theUri.getParentUri(), LOG_CHANNEL, Logger.INFO);
  +                                     ((ExtendedStore) 
store).removeObjectFromCache(theUri.getParentUri());
  +                                 }
  +                             }
  +                         }
  +                        nat.commit();
  +                    } catch(Exception e) {
  +                     if (Domain.isEnabled(LOG_CHANNEL, Logger.ERROR)) {
  +                             Domain.log("Error clearing cache: " + e + ". See 
stderr for stacktrace.", LOG_CHANNEL, Logger.ERROR);
  +                             e.printStackTrace();
  +                     }
                       }
                   }
               };
   
  -            listener = new NotificationListener(host, port, repositoryHost, 
repositoryPort, protocol, credentials,
  -                    repositoryDomain, pollInterval, udp);
  +            /*
  +             * This needs to be done in a thread for three reasons:
  +             * 1) If NotificationListener.subscribe() connects to a Slide instance
  +             *    that is in the process of starting it will wait until the server
  +             *    has finished starting before it returns. If configuration is
  +             *    single-thread this prevents this Slide instance from starting
  +             *    until all other Slide instances have started. This means none
  +             *    of them can start if they're all waiting for each other. Simple
  +             *    test case of this is a cluster of one instance. It never starts.
  +             * 2) Allows for renewing of subscriptions.
  +             * 3) Allows for retrying failed subscriptions. This will happen if
  +             *    a server is down and NotificationListener.subscribe() can't
  +             *    reach it.
  +             */
  +            Thread t = new Thread(new Runnable() {
  +             
  +             private boolean success;
  +             
  +                public void run() {
  +                     success = true;
  +                    listener = new NotificationListener(host, port, repositoryHost, 
repositoryPort, protocol, credentials,
  +                        repositoryDomain, pollInterval, udp);
   
  -            listener.subscribe("Update", uri, depth, lifetime, notificationDelay, 
contentSubscriber, credentials);
  -            listener.subscribe("Update/newmember", uri, depth, lifetime, 
notificationDelay, structureSubscriber, credentials);
  -            listener.subscribe("Delete", uri, depth, lifetime, notificationDelay, 
structureSubscriber, credentials);
  -            listener.subscribe("Move", uri, depth, lifetime, notificationDelay, 
structureSubscriber, credentials);
  +                    success = listener.subscribe("Update", uri, depth, lifetime, 
notificationDelay, contentSubscriber, credentials);
  +                    success = listener.subscribe("Update/newmember", uri, depth, 
lifetime, notificationDelay, structureSubscriber, credentials);
  +                    success = listener.subscribe("Delete", uri, depth, lifetime, 
notificationDelay, structureSubscriber, credentials);
  +                    success = listener.subscribe("Move", uri, depth, lifetime, 
notificationDelay, structureSubscriber, credentials);
  +                    
  +                    if ( !success ) {
  +                                             // try again quickly
  +                     try {
  +                                                     Thread.sleep(10000);
  +                                             } catch (InterruptedException e) {
  +                                                     // ignore
  +                                             }
  +                    } else {
  +                                             // try again before the subscriptions 
expire
  +                     try {
  +                                                     Thread.sleep(lifetime*1000-60);
  +                                             } catch (InterruptedException e) {
  +                                                     // ignore
  +                                             }
  +                    }
  +                }
  +            });
  +            t.setDaemon(true);
  +            t.start();
           }
  +    }
  +    
  +    /**
  +     * Removes the first segment of a uri. "/slide/files/foo" becomes
  +     * "/files/foo".
  +     * 
  +     * @param uri the uri to strip
  +     * @return the stipped uri
  +     */
  +    private String stripUri(String uri) {
  +     if ( uri.indexOf("/") == 0 ) {
  +             uri = uri.substring(1);
  +     }
  +     if ( uri.indexOf("/") > -1 ) {
  +             uri = uri.substring(uri.indexOf("/"));
  +     }
  +     return uri;
       }
   }
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to