Author: vgritsenko
Date: Wed Oct 20 05:46:34 2004
New Revision: 55146

Modified:
   
cocoon/branches/BRANCH_2_1_X/src/blocks/scratchpad/java/org/apache/cocoon/transformation/IncludeTransformer.java
Log:
Resolver should be nulled out in recycle only when all threads are finished 
(and released their sources)


Modified: 
cocoon/branches/BRANCH_2_1_X/src/blocks/scratchpad/java/org/apache/cocoon/transformation/IncludeTransformer.java
==============================================================================
--- 
cocoon/branches/BRANCH_2_1_X/src/blocks/scratchpad/java/org/apache/cocoon/transformation/IncludeTransformer.java
    (original)
+++ 
cocoon/branches/BRANCH_2_1_X/src/blocks/scratchpad/java/org/apache/cocoon/transformation/IncludeTransformer.java
    Wed Oct 20 05:46:34 2004
@@ -93,13 +93,14 @@
  * than in series, in one thread. This parameter can be set in either the 
transformer
  * definition (to affect all IncludeTransformer instances):</p>
  * <pre>
- * &lt;parallel&gt;true&lt;/parallel&gt;
+ *   &lt;parallel&gt;true&lt;/parallel&gt;
  * </pre>
+ *
  * <p>or in a pipeline itself (to only affect that instance of the 
IncludeTransformer):</p>
  * <pre>
- * &lt;map:parameter name="parallel" value="true"/&gt;
+ *   &lt;map:parameter name="parallel" value="true"/&gt;
  * </pre>
- * <p>It defaults to "false".</p>
+ * <p>By default, parallel processing is turned off.</p>
  *
  * @cocoon.sitemap.component.name   include
  * @cocoon.sitemap.component.logger sitemap.transformer.include
@@ -133,6 +134,9 @@
     /** <p>The encoding to use for parameter names and values.</p> */
     private static final String ENCODING = "US-ASCII";
 
+    //
+    // Global configuration
+    //
 
     /** <p>The [EMAIL PROTECTED] ServiceManager} instance associated with this 
instance.</p> */
     private ServiceManager m_manager;
@@ -187,9 +191,15 @@
     /**
      * <p>The IncludeBuffer that is used to buffering events if parallel
      * processing is turned on</p>
+     * <p>This object is also used as a lock for thread counter m_threads</p>
      */
     private SaxBuffer m_buffer;
 
+    /**
+     * <p>Inclusion threads/tasks counter (if executing in parallel)</p>
+     */
+    private int m_threads;
+
 
     /**
      * <p>Create a new [EMAIL PROTECTED] IncludeTransformer} instance.</p>
@@ -241,12 +251,21 @@
      */
     public void recycle() {
         this.m_namespaces = null;
-        this.m_resolver = null;
         this.m_validity = null;
         this.x_parameters = null;
         this.x_value = null;
-        this.m_buffering = false;
-        this.m_buffer = null;
+
+        if (this.m_buffering) {
+            // Wait for threads to complete and release Sources
+            waitForThreads();
+            this.m_buffering = false;
+            this.m_buffer = null;
+        }
+
+        // Resolver can be nulled out when all threads completed processing
+        // and released their Sources.
+        this.m_resolver = null;
+
         super.recycle();
     }
 
@@ -455,7 +474,9 @@
                     /* Check for parallel processing */
                     if (this.m_parallel) {
                         this.m_buffering = true;
-                        if (m_buffer == null) m_buffer = new SaxBuffer();
+                        if (m_buffer == null) {
+                            m_buffer = new SaxBuffer();
+                        }
                         m_buffer.xmlizable(new IncludeBuffer(source));
                     } else {
                         SourceUtil.toSAX(this.m_manager, source, "text/xml",
@@ -546,6 +567,44 @@
     }
 
     /**
+     * Increment active threads counter
+     */
+    int incrementThreads() {
+        synchronized (m_buffer) {
+            return ++m_threads;
+        }
+    }
+
+    /**
+     * Decrement active threads counter
+     */
+    void decrementThreads() {
+        synchronized (m_buffer) {
+            if (--m_threads <= 0) {
+                m_buffer.notify();
+            }
+        }
+    }
+
+    /**
+     * Wait till there is no active threads
+     */
+    private void waitForThreads() {
+        synchronized (m_buffer) {
+            if (m_threads > 0) {
+                if (getLogger().isDebugEnabled()) {
+                    getLogger().debug(m_threads + " threads in progress, 
waiting");
+                }
+
+                try {
+                    m_buffer.wait();
+                } catch (InterruptedException ignored) { }
+                // Don't continue waiting if interrupted.
+            }
+        }
+    }
+
+    /**
      * Buffer for loading included source in separate thread.
      * Streaming of the loaded buffer possible only when source is
      * loaded completely. If loading is not complete, toSAX method
@@ -558,36 +617,56 @@
 
         public IncludeBuffer(Source source) {
             this.source = source;
+
             // FIXME Need thread pool component. Based on 
EDU.oswego.cs.dl.util.concurrent.PooledExecutor.
             //       See also 
org.apache.cocoon.components.cron.QuartzJobScheduler.ThreadPool
-            Thread t = new Thread(this);
-            t.setName("IncludeSource#" + source.getURI());
-            t.setDaemon(true);
-            t.start();
+            try {
+                Thread t = new Thread(this);
+                t.setName("IncludeSource#" + source.getURI());
+                t.setDaemon(true);
+                t.start();
+            } catch (RuntimeException e) {
+                // In case we failed to spawn a thread
+                this.e = new SAXException(e);
+                m_resolver.release(source);
+                throw e;
+            }
         }
 
-        public synchronized void toSAX(ContentHandler contentHandler)
+        /**
+         * Stream content of this buffer when it is loaded completely.
+         * This method blocks if loading is not complete.
+         */
+        public void toSAX(ContentHandler contentHandler)
         throws SAXException {
-            if (!finished) {
-                try {
-                    wait();
-                } catch (InterruptedException ignored) { }
+            synchronized (this) {
+                if (!this.finished) {
+                    try {
+                        wait();
+                    } catch (InterruptedException ignored) { }
+                    // Don't continue waiting if interrupted.
+                }
             }
 
-            if (e != null) {
-                throw e;
+            if (this.e != null) {
+                throw this.e;
             }
 
             super.toSAX(contentHandler);
         }
 
+        /**
+         * Load content of the source into this buffer.
+         */
         public void run() {
+            // Increment active threads counter
+            int t = incrementThreads();
             if (getLogger().isDebugEnabled()) {
-                getLogger().debug("Loading <" + source.getURI() + ">");
+                getLogger().debug("Thread #" + t + " loading <" + 
source.getURI() + ">");
             }
 
             try {
-                SourceUtil.toSAX(m_manager, source, "text/xml", new 
EmbeddedXMLPipe(this));
+                SourceUtil.toSAX(m_manager, this.source, "text/xml", new 
EmbeddedXMLPipe(this));
             } catch (Exception e) {
                 if (!(e instanceof SAXException)) {
                     this.e = new SAXException(e);
@@ -596,18 +675,20 @@
                 }
             } finally {
                 synchronized (this) {
-                    finished = true;
+                    this.finished = true;
                     notify();
                 }
 
-                m_resolver.release(source);
+                // Release source and decrement active threads counter
+                m_resolver.release(this.source);
+                decrementThreads();
             }
 
             if (getLogger().isDebugEnabled()) {
                 if (this.e == null) {
-                    getLogger().debug("Loaded <" + source.getURI() + ">");
+                    getLogger().debug("Thread #" + t + " loaded <" + 
source.getURI() + ">");
                 } else {
-                    getLogger().debug("Failed to load <" + source.getURI() + 
">", this.e);
+                    getLogger().debug("Thread #" + t + " failed to load <" + 
source.getURI() + ">", this.e);
                 }
             }
         }

Reply via email to