Author: vgritsenko
Date: Fri Oct  8 06:26:15 2004
New Revision: 54077

Modified:
   
cocoon/branches/BRANCH_2_1_X/src/blocks/scratchpad/java/org/apache/cocoon/transformation/IncludeTransformer.java
   cocoon/branches/BRANCH_2_1_X/status.xml
Log:
implement parallel (multi threaded) loading of included sources


Modified: 
cocoon/branches/BRANCH_2_1_X/src/blocks/scratchpad/java/org/apache/cocoon/transformation/IncludeTransformer.java
==============================================================================
--- 
cocoon/branches/BRANCH_2_1_X/src/blocks/scratchpad/java/org/apache/cocoon/transformation/IncludeTransformer.java
    (original)
+++ 
cocoon/branches/BRANCH_2_1_X/src/blocks/scratchpad/java/org/apache/cocoon/transformation/IncludeTransformer.java
    Fri Oct  8 06:26:15 2004
@@ -1,12 +1,12 @@
 /*
  * Copyright 1999-2004 The Apache Software Foundation.
- * 
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,39 +15,46 @@
  */
 package org.apache.cocoon.transformation;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
-import java.util.HashMap;
-import java.util.Map;
-
+import org.apache.avalon.framework.configuration.Configurable;
+import org.apache.avalon.framework.configuration.Configuration;
+import org.apache.avalon.framework.configuration.ConfigurationException;
 import org.apache.avalon.framework.parameters.Parameters;
 import org.apache.avalon.framework.service.ServiceException;
 import org.apache.avalon.framework.service.ServiceManager;
 import org.apache.avalon.framework.service.Serviceable;
+
 import org.apache.cocoon.ProcessingException;
 import org.apache.cocoon.caching.CacheableProcessingComponent;
 import org.apache.cocoon.components.source.SourceUtil;
 import org.apache.cocoon.components.source.impl.MultiSourceValidity;
 import org.apache.cocoon.environment.SourceResolver;
 import org.apache.cocoon.util.NetUtils;
+import org.apache.cocoon.xml.EmbeddedXMLPipe;
 import org.apache.cocoon.xml.IncludeXMLConsumer;
 import org.apache.cocoon.xml.NamespacesTable;
+import org.apache.cocoon.xml.SaxBuffer;
+
 import org.apache.excalibur.source.Source;
 import org.apache.excalibur.source.SourceValidity;
 import org.xml.sax.Attributes;
+import org.xml.sax.ContentHandler;
 import org.xml.sax.SAXException;
 
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.HashMap;
+import java.util.Map;
+
 /**
- * <p>A simple transformer including resolvable sources (accessed through 
Cocoon's
- * [EMAIL PROTECTED] SourceResolver} from its input.</p>
- * 
+ * <p>A simple transformer including resolvable sources (accessed through
+ * Cocoon's [EMAIL PROTECTED] SourceResolver} from its input.</p>
+ *
  * <p>Inclusion is triggered by the <code>&lt;include ... /&gt;</code> element
  * defined in the <code>http://apache.org/cocoon/include/1.0</code> 
namespace.</p>
- * 
- * <p>Example:</p>
  *
+ * <p>Example:</p>
  * <pre>
  * &lt;incl:include xmlns:incl="http://apache.org/cocoon/include/1.0";
  *               src="cocoon://path/to/include"/&gt;
@@ -55,7 +62,7 @@
  *
  * <p>Parameters to be passed to the included sources can be specified in two 
ways:
  * the first one is to encode them onto the source itelf, for example:</p>
- * 
+ *
  * <pre>
  * &lt;incl:include xmlns:incl="http://apache.org/cocoon/include/1.0";
  *               
src="cocoon://path/to/include?paramA=valueA&amp;paramB=valueB"/&gt;
@@ -65,7 +72,7 @@
  * the transformer, so that one can easily pass parameter name or values 
containing
  * the <code>&</code> (amperstand) or <code>=</code> (equals) character, which 
are
  * reserved characters in URIs. An example:</p>
- * 
+ *
  * <pre>
  * &lt;incl:include xmlns:incl="http://apache.org/cocoon/include/1.0";
  *               src="cocoon://path/to/include"&gt;
@@ -73,13 +80,28 @@
  *   &lt;incl:parameter name="other&amp;Para=Name" 
value="other=Para&amp;Value"/&gt;
  * &lt;/incl:include&gt;
  * </pre>
- * 
+ *
  * <p>An interesting feature of this [EMAIL PROTECTED] Transformer} is that it 
implements the
  * [EMAIL PROTECTED] CacheableProcessingComponent} interface and provides full 
support for
  * caching. In other words, if the input given to this transformer has not 
changed,
  * and all of the included sources are (cacheable) and still valid, this 
transformer
  * will not force a pipeline re-generation like the [EMAIL PROTECTED] 
CIncludeTransformer}.</p>
- * 
+ *
+ * <h3>Parallel Processing</h3>
+ * <p>Another feature of this [EMAIL PROTECTED] Transformer} is that it allows 
parallel processing
+ * of includes. By setting the optional parameter <code>parallel</code> to 
true,
+ * the various included contents are processed (included) in parallel threads 
rather
+ * than in series, in one thread. This parameter can be set in either the 
transformer
+ * definition (to affect all IncludeTransformer instances):</p>
+ * <pre>
+ * &lt;parallel&gt;true&lt;/parallel&gt;
+ * </pre>
+ * <p>or in a pipeline itself (to only affect that instance of the 
IncludeTransformer):</p>
+ * <pre>
+ * &lt;map:parameter name="parallel" value="true"/&gt;
+ * </pre>
+ * <p>It defaults to "false".</p>
+ *
  * @cocoon.sitemap.component.name   include
  * @cocoon.sitemap.component.logger sitemap.transformer.include
  *
@@ -87,48 +109,93 @@
  * @cocoon.sitemap.component.pooling.max  16
  * @cocoon.sitemap.component.pooling.grow  2
  */
-public class IncludeTransformer extends AbstractTransformer 
-implements Serviceable, Transformer, CacheableProcessingComponent {
+public class IncludeTransformer extends AbstractTransformer
+                                implements Serviceable, Configurable,
+                                           Transformer, 
CacheableProcessingComponent {
 
     /** <p>The namespace URI of the elements recognized by this 
transformer.</p> */
     private static final String NS_URI = 
"http://apache.org/cocoon/include/1.0";;
+
     /** <p>The name of the element triggering inclusion of sources.</p> */
     private static final String INCLUDE_ELEMENT = "include";
+
     /** <p>The name of the element defining an included subrequest 
parameter.</p> */
     private static final String PARAMETER_ELEMENT = "parameter";
+
     /** <p>The name of the attribute indicating the included source URI.</p> */
     private static final String SRC_ATTRIBUTE = "src";
+
     /** <p>The name of the attribute indicating the parameter name.</p> */
     private static final String NAME_ATTRIBUTE = "name";
+
     /** <p>The name of the attribute indicating the parameter name.</p> */
     private static final String VALUE_ATTRIBUTE = "value";
-    
+
     /** <p>The encoding to use for parameter names and values.</p> */
     private static final String ENCODING = "US-ASCII";
 
+
     /** <p>The [EMAIL PROTECTED] ServiceManager} instance associated with this 
instance.</p> */
     private ServiceManager m_manager;
+
+    /**
+     * <p>Configuration option controlling parallel (in multiple threads)
+     * includes processing</p>
+     */
+    private boolean defaultParallel;
+
+    //
+    // Current configuration
+    //
+
     /** <p>The [EMAIL PROTECTED] SourceResolver} used to resolve included 
URIs.</p> */
     private SourceResolver m_resolver;
+
+    /**
+     * <p>Pipeline parameter controlling parallel (in multiple threads)
+     * includes processing</p>
+     */
+    private boolean m_parallel;
+
+    //
+    // Current state
+    //
+
     /** <p>The [EMAIL PROTECTED] SourceValidity} instance associated with this 
request.</p> */
     private MultiSourceValidity m_validity;
+
     /** <p>A [EMAIL PROTECTED] NamespacesTable} used to filter namespace 
declarations.</p> */
     private NamespacesTable m_namespaces;
 
     /** <p>A [EMAIL PROTECTED] Map} of the parameters to supply to the 
included source.</p> */
     private Map x_parameters;
+
     /** <p>The source to be included declared in an include element.</p> */
-    private String x_source; 
+    private String x_source;
+
     /** <p>The current parameter name captured.</p> */
     private String x_parameter;
+
     /** <p>The current parameter value (as a [EMAIL PROTECTED] 
StringBuffer}).</p> */
     private StringBuffer x_value;
-    
+
+    /**
+     * <p>If parallel processing is enabled, then this boolean tells us
+     * whether buffering has started yet.</p>
+     */
+    private boolean m_buffering;
+
+    /**
+     * <p>The IncludeBuffer that is used to buffering events if parallel
+     * processing is turned on</p>
+     */
+    private SaxBuffer m_buffer;
+
+
     /**
      * <p>Create a new [EMAIL PROTECTED] IncludeTransformer} instance.</p>
      */
     public IncludeTransformer() {
-        super();
     }
 
     /**
@@ -140,19 +207,32 @@
         this.m_manager = manager;
     }
 
+    /* (non-Javadoc)
+     * @see Configurable#configure(Configuration)
+     */
+    public void configure(Configuration configuration) throws 
ConfigurationException {
+        // Get value of parallel node from the configuration - defaults to 
false
+        this.defaultParallel = 
configuration.getChild("parallel").getValueAsBoolean(false);
+    }
+
     /**
      * <p>Setup this component instance in the context of its pipeline and
      * current request.</p>
      *
      * @see Serviceable#service(ServiceManager)
      */
-    public void setup(SourceResolver resolver, Map om, String src, Parameters 
parameters) 
+    public void setup(SourceResolver resolver, Map om, String src, Parameters 
parameters)
     throws ProcessingException, SAXException, IOException {
+        // Read parameters
+        this.m_parallel = parameters.getParameterAsBoolean("parallel", 
this.defaultParallel);
+
+        // Init transformer state
         this.m_namespaces = new NamespacesTable();
         this.m_resolver = resolver;
         this.m_validity = null;
         this.x_parameters = null;
         this.x_value = null;
+        this.m_buffering = false;
     }
 
     /**
@@ -161,11 +241,13 @@
      * @see org.apache.avalon.excalibur.pool.Recyclable#recycle()
      */
     public void recycle() {
-        this.m_namespaces = new NamespacesTable();
+        this.m_namespaces = null;
         this.m_resolver = null;
         this.m_validity = null;
         this.x_parameters = null;
         this.x_value = null;
+        this.m_buffering = false;
+        this.m_buffer = null;
         super.recycle();
     }
 
@@ -177,7 +259,8 @@
     public void startDocument()
     throws SAXException {
         /* Make sure that we have a validity while processing */
-        this.getValidity();
+        getValidity();
+
         super.startDocument();
     }
 
@@ -190,6 +273,12 @@
     throws SAXException {
         /* Make sure that the validity is "closed" at the end */
         this.m_validity.close();
+
+        /* This is the end of the line - process the buffered events */
+        if (this.m_buffering) {
+            this.m_buffer.toSAX(super.contentHandler);
+        }
+
         super.endDocument();
     }
 
@@ -200,7 +289,7 @@
      * prefixes associated with the 
<code>http://apache.org/cocoon/include/1.0</code>
      * namespace.</p>
      *
-     * @see org.xml.sax.ContentHandler#startPrefixMapping(String)
+     * @see org.xml.sax.ContentHandler#startPrefixMapping(String, String)
      */
     public void startPrefixMapping(String prefix, String nsuri)
     throws SAXException {
@@ -209,7 +298,11 @@
             this.m_namespaces.addDeclaration(prefix, nsuri);
         } else {
             /* Map the current prefix, as we don't know it */
-            super.startPrefixMapping(prefix, nsuri);
+            if (this.m_buffering) {
+                m_buffer.startPrefixMapping(prefix, nsuri);
+            } else {
+                super.startPrefixMapping(prefix, nsuri);
+            }
         }
     }
 
@@ -229,7 +322,11 @@
             this.m_namespaces.removeDeclaration(prefix);
         } else {
             /* Unmap the current prefix, as we don't know it */
-            super.endPrefixMapping(prefix);
+            if (this.m_buffering) {
+                m_buffer.endPrefixMapping(prefix);
+            } else {
+                super.endPrefixMapping(prefix);
+            }
         }
     }
 
@@ -242,12 +339,18 @@
     throws SAXException {
         /* If we have a parameter value to add to, let's add this chunk */
         if (this.x_parameter != null) {
-            if (this.x_value == null) this.x_value = new StringBuffer();
+            if (this.x_value == null) {
+                this.x_value = new StringBuffer();
+            }
             this.x_value.append(data, offset, length);
 
             /* Forward this only if we are not inside an include tag */
         } else if (this.x_source == null) {
-            super.characters(data, offset, length);
+            if (this.m_buffering) {
+                m_buffer.characters(data, offset, length);
+            } else {
+                super.characters(data, offset, length);
+            }
         }
     }
 
@@ -256,7 +359,7 @@
      *
      * @see org.xml.sax.ContentHandler#startElement(String, String, String, 
Attributes)
      */
-    public void startElement(String uri, String localName, String qName, 
Attributes atts) 
+    public void startElement(String uri, String localName, String qName, 
Attributes atts)
     throws SAXException {
         /* Check the namespace declaration */
         if (NS_URI.equals(uri)) {
@@ -267,13 +370,13 @@
                     throw new SAXException("Invalid include nested in 
another");
                 }
 
-                /* Remember the source we are trying to include */ 
+                /* Remember the source we are trying to include */
                 this.x_source = atts.getValue(SRC_ATTRIBUTE);
                 if ((this.x_source == null) || (this.x_source.length() == 0)) {
                     throw new SAXException("Attribute \"" + SRC_ATTRIBUTE
                                            + "\" not specified");
                 }
-                
+
                 /* Whatever list of parameters we got before, we wipe it! */
                 this.x_parameters = null;
                 this.x_value = null;
@@ -282,7 +385,7 @@
                 /* Done with this element */
                 return;
             }
-            
+
             /* If this is a parameter, then make sure we prepare. */
             if (PARAMETER_ELEMENT.equals(localName)) {
                 /* Check if we are in the right context */
@@ -296,7 +399,7 @@
                 /* Get and process the parameter name */
                 this.x_parameter = atts.getValue(NAME_ATTRIBUTE);
                 if ((this.x_parameter == null) || (this.x_parameter.length() 
== 0)) {
-                    throw new SAXException("Attribute \"" + NAME_ATTRIBUTE 
+                    throw new SAXException("Attribute \"" + NAME_ATTRIBUTE
                                            + "\" not specified");
                 }
 
@@ -307,17 +410,21 @@
                 /* Done with this element */
                 return;
             }
-            
+
             /* We don't have a clue of why we got here (wrong element?) */
-            if (this.getLogger().isWarnEnabled()) {
-                this.getLogger().warn("Unknown element \"" + localName + "\"");
+            if (getLogger().isWarnEnabled()) {
+                getLogger().warn("Unknown element \"" + localName + "\"");
             }
             return;
         }
 
         /* Not our namespace, simply check and pass this element on! */
         if (this.x_source == null) {
-            super.startElement(uri, localName, qName, atts);
+            if (this.m_buffering) {
+                m_buffer.startElement(uri, localName, qName, atts);
+            } else {
+                super.startElement(uri, localName, qName, atts);
+            }
             return;
         }
         throw new SAXException("Element <" + qName + "/> invalid inside 
include");
@@ -332,7 +439,7 @@
     throws SAXException {
         /* Check the namespace declaration */
         if (NS_URI.equals(uri)) {
-            
+
             /* Inclusion will happen here, when we close the include element */
             if (INCLUDE_ELEMENT.equals(localName)) {
 
@@ -345,8 +452,16 @@
                     }
                     source = this.m_resolver.resolveURI(this.x_source);
                     if (this.m_validity != null) 
this.m_validity.addSource(source);
-                    SourceUtil.toSAX(this.m_manager, source, "text/xml", 
-                                     new 
IncludeXMLConsumer(super.contentHandler));
+
+                    /* Check for parallel processing */
+                    if (this.m_parallel) {
+                        this.m_buffering = true;
+                        if (m_buffer == null) m_buffer = new SaxBuffer();
+                        m_buffer.xmlizable(new IncludeBuffer(source));
+                    } else {
+                        SourceUtil.toSAX(this.m_manager, source, "text/xml",
+                                         new 
IncludeXMLConsumer(super.contentHandler));
+                    }
                 } catch (IOException e) {
                     /* Something bad happenend processing a stream */
                     throw new SAXException(e);
@@ -354,10 +469,13 @@
                     /* Something bad happened processing a pipeline */
                     throw new SAXException(e);
                 } finally {
-                    /* In any case, make sure we release the source */
-                    if (source != null) this.m_resolver.release(source);
+                    /* Make sure we release the source if we aren't in 
parellel mode.
+                       In parallel mode, the spawned thread releases the 
source afer processing */
+                    if (!this.m_buffering && source != null) {
+                        this.m_resolver.release(source);
+                    }
                 }
-                
+
                 /* We are done with the include element */
                 this.x_parameters = null;
                 this.x_value = null;
@@ -372,28 +490,32 @@
 
                 /* Store the parameter name and value */
                 try {
-                    /* 
+                    /*
                      * Note: the parameter name and value are URL encoded, so 
that
                      * weird characters such as "&" or "=" (have special 
meaning)
                      * are passed through flawlessly.
                      */
-                    if (this.x_parameters == null) this.x_parameters = new 
HashMap();
+                    if (this.x_parameters == null) this.x_parameters = new 
HashMap(5);
                     this.x_parameters.put(URLEncoder.encode(this.x_parameter, 
ENCODING),
                                           URLEncoder.encode(value, ENCODING));
                 } catch (UnsupportedEncodingException e) {
-                    throw new SAXException("Your platform does not support the 
"
-                                           + ENCODING + " encoding", e);
+                    throw new SAXException("Your platform does not support the 
" +
+                                           ENCODING + " encoding", e);
                 }
-                
+
                 /* We are done with this parameter element */
                 this.x_value = null;
                 this.x_parameter = null;
                 return;
             }
-            
+
         } else {
             /* This is not our namespace, pass the event on! */
-            super.endElement(uri, localName, qName);
+            if (this.m_buffering) {
+                m_buffer.endElement(uri, localName, qName);
+            } else {
+                super.endElement(uri, localName, qName);
+            }
         }
     }
 
@@ -422,5 +544,73 @@
             m_validity = new MultiSourceValidity(m_resolver, -1);
         }
         return m_validity;
+    }
+
+    /**
+     * Buffer for loading included source in separate thread.
+     * Streaming of the loaded buffer possible only when source is
+     * loaded completely. If loading is not complete, toSAX method
+     * will block.
+     */
+    private class IncludeBuffer extends SaxBuffer implements Runnable {
+        private Source source;
+        private boolean finished;
+        private SAXException e;
+
+        public IncludeBuffer(Source source) {
+            this.source = source;
+            // FIXME Need thread pool component. Based on 
EDU.oswego.cs.dl.util.concurrent.PooledExecutor.
+            //       See also 
org.apache.cocoon.components.cron.QuartzJobScheduler.ThreadPool
+            Thread t = new Thread(this);
+            t.setName("IncludeSource#" + source.getURI());
+            t.setDaemon(true);
+            t.start();
+        }
+
+        public synchronized void toSAX(ContentHandler contentHandler)
+        throws SAXException {
+            if (!finished) {
+                try {
+                    wait();
+                } catch (InterruptedException ignored) { }
+            }
+
+            if (e != null) {
+                throw e;
+            }
+
+            super.toSAX(contentHandler);
+        }
+
+        public void run() {
+            if (getLogger().isDebugEnabled()) {
+                getLogger().debug("Loading <" + source.getURI() + ">");
+            }
+
+            try {
+                SourceUtil.toSAX(m_manager, source, "text/xml", new 
EmbeddedXMLPipe(this));
+            } catch (Exception e) {
+                if (!(e instanceof SAXException)) {
+                    this.e = new SAXException(e);
+                } else {
+                    this.e = (SAXException) e;
+                }
+            } finally {
+                synchronized (this) {
+                    finished = true;
+                    notify();
+                }
+
+                m_resolver.release(source);
+            }
+
+            if (getLogger().isDebugEnabled()) {
+                if (this.e == null) {
+                    getLogger().debug("Loaded <" + source.getURI() + ">");
+                } else {
+                    getLogger().debug("Failed to load <" + source.getURI() + 
">", this.e);
+                }
+            }
+        }
     }
 }

Modified: cocoon/branches/BRANCH_2_1_X/status.xml
==============================================================================
--- cocoon/branches/BRANCH_2_1_X/status.xml     (original)
+++ cocoon/branches/BRANCH_2_1_X/status.xml     Fri Oct  8 06:26:15 2004
@@ -205,6 +205,10 @@
 
   <changes>
  <release version="@version@" date="@date@">
+   <action dev="VG" type="add">
+     Scratchpad block: Implemented support of parallel loading of included
+     sources in the IncludeTransformer.
+   </action>
    <action dev="AG" type="update">
      Updated commons-lang to 2.0-20041007T2305. Now cocoon compiles in java 1.5
    </action>

Reply via email to