Author: vgritsenko Date: Wed Oct 20 05:46:34 2004 New Revision: 55146 Modified: cocoon/branches/BRANCH_2_1_X/src/blocks/scratchpad/java/org/apache/cocoon/transformation/IncludeTransformer.java Log: Resolver should be nulled out in recycle only when all threads are finished (and released their sources)
Modified: cocoon/branches/BRANCH_2_1_X/src/blocks/scratchpad/java/org/apache/cocoon/transformation/IncludeTransformer.java ============================================================================== --- cocoon/branches/BRANCH_2_1_X/src/blocks/scratchpad/java/org/apache/cocoon/transformation/IncludeTransformer.java (original) +++ cocoon/branches/BRANCH_2_1_X/src/blocks/scratchpad/java/org/apache/cocoon/transformation/IncludeTransformer.java Wed Oct 20 05:46:34 2004 @@ -93,13 +93,14 @@ * than in series, in one thread. This parameter can be set in either the transformer * definition (to affect all IncludeTransformer instances):</p> * <pre> - * <parallel>true</parallel> + * <parallel>true</parallel> * </pre> + * * <p>or in a pipeline itself (to only affect that instance of the IncludeTransformer):</p> * <pre> - * <map:parameter name="parallel" value="true"/> + * <map:parameter name="parallel" value="true"/> * </pre> - * <p>It defaults to "false".</p> + * <p>By default, parallel processing is turned off.</p> * * @cocoon.sitemap.component.name include * @cocoon.sitemap.component.logger sitemap.transformer.include @@ -133,6 +134,9 @@ /** <p>The encoding to use for parameter names and values.</p> */ private static final String ENCODING = "US-ASCII"; + // + // Global configuration + // /** <p>The [EMAIL PROTECTED] ServiceManager} instance associated with this instance.</p> */ private ServiceManager m_manager; @@ -187,9 +191,15 @@ /** * <p>The IncludeBuffer that is used to buffering events if parallel * processing is turned on</p> + * <p>This object is also used as a lock for thread counter m_threads</p> */ private SaxBuffer m_buffer; + /** + * <p>Inclusion threads/tasks counter (if executing in parallel)</p> + */ + private int m_threads; + /** * <p>Create a new [EMAIL PROTECTED] IncludeTransformer} instance.</p> @@ -241,12 +251,21 @@ */ public void recycle() { this.m_namespaces = null; - this.m_resolver = null; this.m_validity = null; this.x_parameters = null; this.x_value = null; - this.m_buffering = false; - this.m_buffer = null; + + if (this.m_buffering) { + // Wait for threads to complete and release Sources + waitForThreads(); + this.m_buffering = false; + this.m_buffer = null; + } + + // Resolver can be nulled out when all threads completed processing + // and released their Sources. + this.m_resolver = null; + super.recycle(); } @@ -455,7 +474,9 @@ /* Check for parallel processing */ if (this.m_parallel) { this.m_buffering = true; - if (m_buffer == null) m_buffer = new SaxBuffer(); + if (m_buffer == null) { + m_buffer = new SaxBuffer(); + } m_buffer.xmlizable(new IncludeBuffer(source)); } else { SourceUtil.toSAX(this.m_manager, source, "text/xml", @@ -546,6 +567,44 @@ } /** + * Increment active threads counter + */ + int incrementThreads() { + synchronized (m_buffer) { + return ++m_threads; + } + } + + /** + * Decrement active threads counter + */ + void decrementThreads() { + synchronized (m_buffer) { + if (--m_threads <= 0) { + m_buffer.notify(); + } + } + } + + /** + * Wait till there is no active threads + */ + private void waitForThreads() { + synchronized (m_buffer) { + if (m_threads > 0) { + if (getLogger().isDebugEnabled()) { + getLogger().debug(m_threads + " threads in progress, waiting"); + } + + try { + m_buffer.wait(); + } catch (InterruptedException ignored) { } + // Don't continue waiting if interrupted. + } + } + } + + /** * Buffer for loading included source in separate thread. * Streaming of the loaded buffer possible only when source is * loaded completely. If loading is not complete, toSAX method @@ -558,36 +617,56 @@ public IncludeBuffer(Source source) { this.source = source; + // FIXME Need thread pool component. Based on EDU.oswego.cs.dl.util.concurrent.PooledExecutor. // See also org.apache.cocoon.components.cron.QuartzJobScheduler.ThreadPool - Thread t = new Thread(this); - t.setName("IncludeSource#" + source.getURI()); - t.setDaemon(true); - t.start(); + try { + Thread t = new Thread(this); + t.setName("IncludeSource#" + source.getURI()); + t.setDaemon(true); + t.start(); + } catch (RuntimeException e) { + // In case we failed to spawn a thread + this.e = new SAXException(e); + m_resolver.release(source); + throw e; + } } - public synchronized void toSAX(ContentHandler contentHandler) + /** + * Stream content of this buffer when it is loaded completely. + * This method blocks if loading is not complete. + */ + public void toSAX(ContentHandler contentHandler) throws SAXException { - if (!finished) { - try { - wait(); - } catch (InterruptedException ignored) { } + synchronized (this) { + if (!this.finished) { + try { + wait(); + } catch (InterruptedException ignored) { } + // Don't continue waiting if interrupted. + } } - if (e != null) { - throw e; + if (this.e != null) { + throw this.e; } super.toSAX(contentHandler); } + /** + * Load content of the source into this buffer. + */ public void run() { + // Increment active threads counter + int t = incrementThreads(); if (getLogger().isDebugEnabled()) { - getLogger().debug("Loading <" + source.getURI() + ">"); + getLogger().debug("Thread #" + t + " loading <" + source.getURI() + ">"); } try { - SourceUtil.toSAX(m_manager, source, "text/xml", new EmbeddedXMLPipe(this)); + SourceUtil.toSAX(m_manager, this.source, "text/xml", new EmbeddedXMLPipe(this)); } catch (Exception e) { if (!(e instanceof SAXException)) { this.e = new SAXException(e); @@ -596,18 +675,20 @@ } } finally { synchronized (this) { - finished = true; + this.finished = true; notify(); } - m_resolver.release(source); + // Release source and decrement active threads counter + m_resolver.release(this.source); + decrementThreads(); } if (getLogger().isDebugEnabled()) { if (this.e == null) { - getLogger().debug("Loaded <" + source.getURI() + ">"); + getLogger().debug("Thread #" + t + " loaded <" + source.getURI() + ">"); } else { - getLogger().debug("Failed to load <" + source.getURI() + ">", this.e); + getLogger().debug("Thread #" + t + " failed to load <" + source.getURI() + ">", this.e); } } }