Author: vgritsenko Date: Fri Oct 8 06:26:15 2004 New Revision: 54077 Modified: cocoon/branches/BRANCH_2_1_X/src/blocks/scratchpad/java/org/apache/cocoon/transformation/IncludeTransformer.java cocoon/branches/BRANCH_2_1_X/status.xml Log: implement parallel (multi threaded) loading of included sources
Modified: cocoon/branches/BRANCH_2_1_X/src/blocks/scratchpad/java/org/apache/cocoon/transformation/IncludeTransformer.java ============================================================================== --- cocoon/branches/BRANCH_2_1_X/src/blocks/scratchpad/java/org/apache/cocoon/transformation/IncludeTransformer.java (original) +++ cocoon/branches/BRANCH_2_1_X/src/blocks/scratchpad/java/org/apache/cocoon/transformation/IncludeTransformer.java Fri Oct 8 06:26:15 2004 @@ -1,12 +1,12 @@ /* * Copyright 1999-2004 The Apache Software Foundation. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,39 +15,46 @@ */ package org.apache.cocoon.transformation; -import java.io.IOException; -import java.io.Serializable; -import java.io.UnsupportedEncodingException; -import java.net.URLEncoder; -import java.util.HashMap; -import java.util.Map; - +import org.apache.avalon.framework.configuration.Configurable; +import org.apache.avalon.framework.configuration.Configuration; +import org.apache.avalon.framework.configuration.ConfigurationException; import org.apache.avalon.framework.parameters.Parameters; import org.apache.avalon.framework.service.ServiceException; import org.apache.avalon.framework.service.ServiceManager; import org.apache.avalon.framework.service.Serviceable; + import org.apache.cocoon.ProcessingException; import org.apache.cocoon.caching.CacheableProcessingComponent; import org.apache.cocoon.components.source.SourceUtil; import org.apache.cocoon.components.source.impl.MultiSourceValidity; import org.apache.cocoon.environment.SourceResolver; import org.apache.cocoon.util.NetUtils; +import org.apache.cocoon.xml.EmbeddedXMLPipe; import org.apache.cocoon.xml.IncludeXMLConsumer; import org.apache.cocoon.xml.NamespacesTable; +import org.apache.cocoon.xml.SaxBuffer; + import org.apache.excalibur.source.Source; import org.apache.excalibur.source.SourceValidity; import org.xml.sax.Attributes; +import org.xml.sax.ContentHandler; import org.xml.sax.SAXException; +import java.io.IOException; +import java.io.Serializable; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.util.HashMap; +import java.util.Map; + /** - * <p>A simple transformer including resolvable sources (accessed through Cocoon's - * [EMAIL PROTECTED] SourceResolver} from its input.</p> - * + * <p>A simple transformer including resolvable sources (accessed through + * Cocoon's [EMAIL PROTECTED] SourceResolver} from its input.</p> + * * <p>Inclusion is triggered by the <code><include ... /></code> element * defined in the <code>http://apache.org/cocoon/include/1.0</code> namespace.</p> - * - * <p>Example:</p> * + * <p>Example:</p> * <pre> * <incl:include xmlns:incl="http://apache.org/cocoon/include/1.0" * src="cocoon://path/to/include"/> @@ -55,7 +62,7 @@ * * <p>Parameters to be passed to the included sources can be specified in two ways: * the first one is to encode them onto the source itelf, for example:</p> - * + * * <pre> * <incl:include xmlns:incl="http://apache.org/cocoon/include/1.0" * src="cocoon://path/to/include?paramA=valueA&paramB=valueB"/> @@ -65,7 +72,7 @@ * the transformer, so that one can easily pass parameter name or values containing * the <code>&</code> (amperstand) or <code>=</code> (equals) character, which are * reserved characters in URIs. An example:</p> - * + * * <pre> * <incl:include xmlns:incl="http://apache.org/cocoon/include/1.0" * src="cocoon://path/to/include"> @@ -73,13 +80,28 @@ * <incl:parameter name="other&Para=Name" value="other=Para&Value"/> * </incl:include> * </pre> - * + * * <p>An interesting feature of this [EMAIL PROTECTED] Transformer} is that it implements the * [EMAIL PROTECTED] CacheableProcessingComponent} interface and provides full support for * caching. In other words, if the input given to this transformer has not changed, * and all of the included sources are (cacheable) and still valid, this transformer * will not force a pipeline re-generation like the [EMAIL PROTECTED] CIncludeTransformer}.</p> - * + * + * <h3>Parallel Processing</h3> + * <p>Another feature of this [EMAIL PROTECTED] Transformer} is that it allows parallel processing + * of includes. By setting the optional parameter <code>parallel</code> to true, + * the various included contents are processed (included) in parallel threads rather + * than in series, in one thread. This parameter can be set in either the transformer + * definition (to affect all IncludeTransformer instances):</p> + * <pre> + * <parallel>true</parallel> + * </pre> + * <p>or in a pipeline itself (to only affect that instance of the IncludeTransformer):</p> + * <pre> + * <map:parameter name="parallel" value="true"/> + * </pre> + * <p>It defaults to "false".</p> + * * @cocoon.sitemap.component.name include * @cocoon.sitemap.component.logger sitemap.transformer.include * @@ -87,48 +109,93 @@ * @cocoon.sitemap.component.pooling.max 16 * @cocoon.sitemap.component.pooling.grow 2 */ -public class IncludeTransformer extends AbstractTransformer -implements Serviceable, Transformer, CacheableProcessingComponent { +public class IncludeTransformer extends AbstractTransformer + implements Serviceable, Configurable, + Transformer, CacheableProcessingComponent { /** <p>The namespace URI of the elements recognized by this transformer.</p> */ private static final String NS_URI = "http://apache.org/cocoon/include/1.0"; + /** <p>The name of the element triggering inclusion of sources.</p> */ private static final String INCLUDE_ELEMENT = "include"; + /** <p>The name of the element defining an included subrequest parameter.</p> */ private static final String PARAMETER_ELEMENT = "parameter"; + /** <p>The name of the attribute indicating the included source URI.</p> */ private static final String SRC_ATTRIBUTE = "src"; + /** <p>The name of the attribute indicating the parameter name.</p> */ private static final String NAME_ATTRIBUTE = "name"; + /** <p>The name of the attribute indicating the parameter name.</p> */ private static final String VALUE_ATTRIBUTE = "value"; - + /** <p>The encoding to use for parameter names and values.</p> */ private static final String ENCODING = "US-ASCII"; + /** <p>The [EMAIL PROTECTED] ServiceManager} instance associated with this instance.</p> */ private ServiceManager m_manager; + + /** + * <p>Configuration option controlling parallel (in multiple threads) + * includes processing</p> + */ + private boolean defaultParallel; + + // + // Current configuration + // + /** <p>The [EMAIL PROTECTED] SourceResolver} used to resolve included URIs.</p> */ private SourceResolver m_resolver; + + /** + * <p>Pipeline parameter controlling parallel (in multiple threads) + * includes processing</p> + */ + private boolean m_parallel; + + // + // Current state + // + /** <p>The [EMAIL PROTECTED] SourceValidity} instance associated with this request.</p> */ private MultiSourceValidity m_validity; + /** <p>A [EMAIL PROTECTED] NamespacesTable} used to filter namespace declarations.</p> */ private NamespacesTable m_namespaces; /** <p>A [EMAIL PROTECTED] Map} of the parameters to supply to the included source.</p> */ private Map x_parameters; + /** <p>The source to be included declared in an include element.</p> */ - private String x_source; + private String x_source; + /** <p>The current parameter name captured.</p> */ private String x_parameter; + /** <p>The current parameter value (as a [EMAIL PROTECTED] StringBuffer}).</p> */ private StringBuffer x_value; - + + /** + * <p>If parallel processing is enabled, then this boolean tells us + * whether buffering has started yet.</p> + */ + private boolean m_buffering; + + /** + * <p>The IncludeBuffer that is used to buffering events if parallel + * processing is turned on</p> + */ + private SaxBuffer m_buffer; + + /** * <p>Create a new [EMAIL PROTECTED] IncludeTransformer} instance.</p> */ public IncludeTransformer() { - super(); } /** @@ -140,19 +207,32 @@ this.m_manager = manager; } + /* (non-Javadoc) + * @see Configurable#configure(Configuration) + */ + public void configure(Configuration configuration) throws ConfigurationException { + // Get value of parallel node from the configuration - defaults to false + this.defaultParallel = configuration.getChild("parallel").getValueAsBoolean(false); + } + /** * <p>Setup this component instance in the context of its pipeline and * current request.</p> * * @see Serviceable#service(ServiceManager) */ - public void setup(SourceResolver resolver, Map om, String src, Parameters parameters) + public void setup(SourceResolver resolver, Map om, String src, Parameters parameters) throws ProcessingException, SAXException, IOException { + // Read parameters + this.m_parallel = parameters.getParameterAsBoolean("parallel", this.defaultParallel); + + // Init transformer state this.m_namespaces = new NamespacesTable(); this.m_resolver = resolver; this.m_validity = null; this.x_parameters = null; this.x_value = null; + this.m_buffering = false; } /** @@ -161,11 +241,13 @@ * @see org.apache.avalon.excalibur.pool.Recyclable#recycle() */ public void recycle() { - this.m_namespaces = new NamespacesTable(); + this.m_namespaces = null; this.m_resolver = null; this.m_validity = null; this.x_parameters = null; this.x_value = null; + this.m_buffering = false; + this.m_buffer = null; super.recycle(); } @@ -177,7 +259,8 @@ public void startDocument() throws SAXException { /* Make sure that we have a validity while processing */ - this.getValidity(); + getValidity(); + super.startDocument(); } @@ -190,6 +273,12 @@ throws SAXException { /* Make sure that the validity is "closed" at the end */ this.m_validity.close(); + + /* This is the end of the line - process the buffered events */ + if (this.m_buffering) { + this.m_buffer.toSAX(super.contentHandler); + } + super.endDocument(); } @@ -200,7 +289,7 @@ * prefixes associated with the <code>http://apache.org/cocoon/include/1.0</code> * namespace.</p> * - * @see org.xml.sax.ContentHandler#startPrefixMapping(String) + * @see org.xml.sax.ContentHandler#startPrefixMapping(String, String) */ public void startPrefixMapping(String prefix, String nsuri) throws SAXException { @@ -209,7 +298,11 @@ this.m_namespaces.addDeclaration(prefix, nsuri); } else { /* Map the current prefix, as we don't know it */ - super.startPrefixMapping(prefix, nsuri); + if (this.m_buffering) { + m_buffer.startPrefixMapping(prefix, nsuri); + } else { + super.startPrefixMapping(prefix, nsuri); + } } } @@ -229,7 +322,11 @@ this.m_namespaces.removeDeclaration(prefix); } else { /* Unmap the current prefix, as we don't know it */ - super.endPrefixMapping(prefix); + if (this.m_buffering) { + m_buffer.endPrefixMapping(prefix); + } else { + super.endPrefixMapping(prefix); + } } } @@ -242,12 +339,18 @@ throws SAXException { /* If we have a parameter value to add to, let's add this chunk */ if (this.x_parameter != null) { - if (this.x_value == null) this.x_value = new StringBuffer(); + if (this.x_value == null) { + this.x_value = new StringBuffer(); + } this.x_value.append(data, offset, length); /* Forward this only if we are not inside an include tag */ } else if (this.x_source == null) { - super.characters(data, offset, length); + if (this.m_buffering) { + m_buffer.characters(data, offset, length); + } else { + super.characters(data, offset, length); + } } } @@ -256,7 +359,7 @@ * * @see org.xml.sax.ContentHandler#startElement(String, String, String, Attributes) */ - public void startElement(String uri, String localName, String qName, Attributes atts) + public void startElement(String uri, String localName, String qName, Attributes atts) throws SAXException { /* Check the namespace declaration */ if (NS_URI.equals(uri)) { @@ -267,13 +370,13 @@ throw new SAXException("Invalid include nested in another"); } - /* Remember the source we are trying to include */ + /* Remember the source we are trying to include */ this.x_source = atts.getValue(SRC_ATTRIBUTE); if ((this.x_source == null) || (this.x_source.length() == 0)) { throw new SAXException("Attribute \"" + SRC_ATTRIBUTE + "\" not specified"); } - + /* Whatever list of parameters we got before, we wipe it! */ this.x_parameters = null; this.x_value = null; @@ -282,7 +385,7 @@ /* Done with this element */ return; } - + /* If this is a parameter, then make sure we prepare. */ if (PARAMETER_ELEMENT.equals(localName)) { /* Check if we are in the right context */ @@ -296,7 +399,7 @@ /* Get and process the parameter name */ this.x_parameter = atts.getValue(NAME_ATTRIBUTE); if ((this.x_parameter == null) || (this.x_parameter.length() == 0)) { - throw new SAXException("Attribute \"" + NAME_ATTRIBUTE + throw new SAXException("Attribute \"" + NAME_ATTRIBUTE + "\" not specified"); } @@ -307,17 +410,21 @@ /* Done with this element */ return; } - + /* We don't have a clue of why we got here (wrong element?) */ - if (this.getLogger().isWarnEnabled()) { - this.getLogger().warn("Unknown element \"" + localName + "\""); + if (getLogger().isWarnEnabled()) { + getLogger().warn("Unknown element \"" + localName + "\""); } return; } /* Not our namespace, simply check and pass this element on! */ if (this.x_source == null) { - super.startElement(uri, localName, qName, atts); + if (this.m_buffering) { + m_buffer.startElement(uri, localName, qName, atts); + } else { + super.startElement(uri, localName, qName, atts); + } return; } throw new SAXException("Element <" + qName + "/> invalid inside include"); @@ -332,7 +439,7 @@ throws SAXException { /* Check the namespace declaration */ if (NS_URI.equals(uri)) { - + /* Inclusion will happen here, when we close the include element */ if (INCLUDE_ELEMENT.equals(localName)) { @@ -345,8 +452,16 @@ } source = this.m_resolver.resolveURI(this.x_source); if (this.m_validity != null) this.m_validity.addSource(source); - SourceUtil.toSAX(this.m_manager, source, "text/xml", - new IncludeXMLConsumer(super.contentHandler)); + + /* Check for parallel processing */ + if (this.m_parallel) { + this.m_buffering = true; + if (m_buffer == null) m_buffer = new SaxBuffer(); + m_buffer.xmlizable(new IncludeBuffer(source)); + } else { + SourceUtil.toSAX(this.m_manager, source, "text/xml", + new IncludeXMLConsumer(super.contentHandler)); + } } catch (IOException e) { /* Something bad happenend processing a stream */ throw new SAXException(e); @@ -354,10 +469,13 @@ /* Something bad happened processing a pipeline */ throw new SAXException(e); } finally { - /* In any case, make sure we release the source */ - if (source != null) this.m_resolver.release(source); + /* Make sure we release the source if we aren't in parellel mode. + In parallel mode, the spawned thread releases the source afer processing */ + if (!this.m_buffering && source != null) { + this.m_resolver.release(source); + } } - + /* We are done with the include element */ this.x_parameters = null; this.x_value = null; @@ -372,28 +490,32 @@ /* Store the parameter name and value */ try { - /* + /* * Note: the parameter name and value are URL encoded, so that * weird characters such as "&" or "=" (have special meaning) * are passed through flawlessly. */ - if (this.x_parameters == null) this.x_parameters = new HashMap(); + if (this.x_parameters == null) this.x_parameters = new HashMap(5); this.x_parameters.put(URLEncoder.encode(this.x_parameter, ENCODING), URLEncoder.encode(value, ENCODING)); } catch (UnsupportedEncodingException e) { - throw new SAXException("Your platform does not support the " - + ENCODING + " encoding", e); + throw new SAXException("Your platform does not support the " + + ENCODING + " encoding", e); } - + /* We are done with this parameter element */ this.x_value = null; this.x_parameter = null; return; } - + } else { /* This is not our namespace, pass the event on! */ - super.endElement(uri, localName, qName); + if (this.m_buffering) { + m_buffer.endElement(uri, localName, qName); + } else { + super.endElement(uri, localName, qName); + } } } @@ -422,5 +544,73 @@ m_validity = new MultiSourceValidity(m_resolver, -1); } return m_validity; + } + + /** + * Buffer for loading included source in separate thread. + * Streaming of the loaded buffer possible only when source is + * loaded completely. If loading is not complete, toSAX method + * will block. + */ + private class IncludeBuffer extends SaxBuffer implements Runnable { + private Source source; + private boolean finished; + private SAXException e; + + public IncludeBuffer(Source source) { + this.source = source; + // FIXME Need thread pool component. Based on EDU.oswego.cs.dl.util.concurrent.PooledExecutor. + // See also org.apache.cocoon.components.cron.QuartzJobScheduler.ThreadPool + Thread t = new Thread(this); + t.setName("IncludeSource#" + source.getURI()); + t.setDaemon(true); + t.start(); + } + + public synchronized void toSAX(ContentHandler contentHandler) + throws SAXException { + if (!finished) { + try { + wait(); + } catch (InterruptedException ignored) { } + } + + if (e != null) { + throw e; + } + + super.toSAX(contentHandler); + } + + public void run() { + if (getLogger().isDebugEnabled()) { + getLogger().debug("Loading <" + source.getURI() + ">"); + } + + try { + SourceUtil.toSAX(m_manager, source, "text/xml", new EmbeddedXMLPipe(this)); + } catch (Exception e) { + if (!(e instanceof SAXException)) { + this.e = new SAXException(e); + } else { + this.e = (SAXException) e; + } + } finally { + synchronized (this) { + finished = true; + notify(); + } + + m_resolver.release(source); + } + + if (getLogger().isDebugEnabled()) { + if (this.e == null) { + getLogger().debug("Loaded <" + source.getURI() + ">"); + } else { + getLogger().debug("Failed to load <" + source.getURI() + ">", this.e); + } + } + } } } Modified: cocoon/branches/BRANCH_2_1_X/status.xml ============================================================================== --- cocoon/branches/BRANCH_2_1_X/status.xml (original) +++ cocoon/branches/BRANCH_2_1_X/status.xml Fri Oct 8 06:26:15 2004 @@ -205,6 +205,10 @@ <changes> <release version="@version@" date="@date@"> + <action dev="VG" type="add"> + Scratchpad block: Implemented support of parallel loading of included + sources in the IncludeTransformer. + </action> <action dev="AG" type="update"> Updated commons-lang to 2.0-20041007T2305. Now cocoon compiles in java 1.5 </action>