Author: davsclaus
Date: Mon Jul 14 04:16:00 2008
New Revision: 676550

URL: http://svn.apache.org/viewvc?rev=676550&view=rev
Log:
CAMEL-702: A proposed fix for CAMEL-702. Gert will take a look too. Concurrency 
issue with camel-saon. WORK IN PROGRESS!!

Modified:
    
activemq/camel/trunk/components/camel-saxon/src/main/java/org/apache/camel/component/xquery/XQueryBuilder.java
    
activemq/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java
    
activemq/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryURLBasedConcurrencyTest.java

Modified: 
activemq/camel/trunk/components/camel-saxon/src/main/java/org/apache/camel/component/xquery/XQueryBuilder.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-saxon/src/main/java/org/apache/camel/component/xquery/XQueryBuilder.java?rev=676550&r1=676549&r2=676550&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-saxon/src/main/java/org/apache/camel/component/xquery/XQueryBuilder.java
 (original)
+++ 
activemq/camel/trunk/components/camel-saxon/src/main/java/org/apache/camel/component/xquery/XQueryBuilder.java
 Mon Jul 14 04:16:00 2008
@@ -30,6 +30,7 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.xml.transform.Result;
 import javax.xml.transform.Source;
 import javax.xml.transform.dom.DOMResult;
@@ -78,6 +79,7 @@
     private ResultFormat resultsFormat = ResultFormat.DOM;
     private Properties properties = new Properties();
     private Class resultType;
+    private final AtomicBoolean initialized = new AtomicBoolean(false);
 
     @Override
     public String toString() {
@@ -93,10 +95,13 @@
     }
 
     public Object evaluate(Exchange exchange) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Evaluation " + expression + " for exchange: " + 
exchange);
-        }
         try {
+            initialize();
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Evaluation " + expression + " for exchange: " + 
exchange);
+            }
+
             if (resultType != null) {
                 if (resultType.equals(String.class)) {
                     return evaluateAsString(exchange);
@@ -128,28 +133,29 @@
         }
     }
 
-    /**
-     * Configures the namespace context from the given DOM element
-     */
-    public void setNamespaces(Map<String, String> namespaces) {
-        namespacePrefixes.putAll(namespaces);
-    }
-
     public List evaluateAsList(Exchange exchange) throws Exception {
+        initialize();
+
         return getExpression().evaluate(createDynamicContext(exchange));
     }
 
     public Object evaluateAsStringSource(Exchange exchange) throws Exception {
+        initialize();
+
         String text = evaluateAsString(exchange);
         return new StringSource(text);
     }
 
     public Object evaluateAsBytesSource(Exchange exchange) throws Exception {
+        initialize();
+
         byte[] bytes = evaluateAsBytes(exchange);
         return new BytesSource(bytes);
     }
 
     public Node evaluateAsDOM(Exchange exchange) throws Exception {
+        initialize();
+
         DOMResult result = new DOMResult();
         DynamicQueryContext context = createDynamicContext(exchange);
         XQueryExpression expression = getExpression();
@@ -158,6 +164,8 @@
     }
 
     public byte[] evaluateAsBytes(Exchange exchange) throws Exception {
+        initialize();
+
         ByteArrayOutputStream buffer = new ByteArrayOutputStream();
         Result result = new StreamResult(buffer);
         getExpression().pull(createDynamicContext(exchange), result, 
properties);
@@ -166,6 +174,8 @@
     }
 
     public String evaluateAsString(Exchange exchange) throws Exception {
+        initialize();
+        
         StringWriter buffer = new StringWriter();
         SequenceIterator iter = 
getExpression().iterator(createDynamicContext(exchange));
         for (Item item = iter.next(); item != null; item = iter.next()) {
@@ -198,7 +208,8 @@
     //-------------------------------------------------------------------------
     public static XQueryBuilder xquery(final String queryText) {
         return new XQueryBuilder() {
-            protected XQueryExpression 
createQueryExpression(StaticQueryContext staticQueryContext) throws 
XPathException {
+            protected XQueryExpression 
createQueryExpression(StaticQueryContext staticQueryContext)
+                throws XPathException {
                 return staticQueryContext.compileQuery(queryText);
             }
         };
@@ -206,7 +217,8 @@
 
     public static XQueryBuilder xquery(final Reader reader) {
         return new XQueryBuilder() {
-            protected XQueryExpression 
createQueryExpression(StaticQueryContext staticQueryContext) throws 
XPathException, IOException {
+            protected XQueryExpression 
createQueryExpression(StaticQueryContext staticQueryContext)
+                throws XPathException, IOException {
                 return staticQueryContext.compileQuery(reader);
             }
         };
@@ -214,7 +226,8 @@
 
     public static XQueryBuilder xquery(final InputStream in, final String 
characterSet) {
         return new XQueryBuilder() {
-            protected XQueryExpression 
createQueryExpression(StaticQueryContext staticQueryContext) throws 
XPathException, IOException {
+            protected XQueryExpression 
createQueryExpression(StaticQueryContext staticQueryContext)
+                throws XPathException, IOException {
                 return staticQueryContext.compileQuery(in, characterSet);
             }
         };
@@ -245,6 +258,8 @@
 
     public XQueryBuilder namespace(String prefix, String uri) {
         namespacePrefixes.put(prefix, uri);
+        // more namespace, we must re initialize
+        initialized.set(false);
         return this;
     }
 
@@ -291,42 +306,37 @@
     // Properties
     // 
-------------------------------------------------------------------------
 
-    public synchronized XQueryExpression getExpression() throws IOException, 
XPathException {
-        if (expression == null) {
-            expression = createQueryExpression(getStaticQueryContext());
-            clearBuilderReferences();
-        }
+    /**
+     * Configures the namespace context from the given DOM element
+     */
+    public void setNamespaces(Map<String, String> namespaces) {
+        namespacePrefixes.putAll(namespaces);
+        // more namespace, we must re initialize
+        initialized.set(false);
+    }
+
+    public XQueryExpression getExpression() throws IOException, XPathException 
{
         return expression;
     }
 
-    public synchronized Configuration getConfiguration() {
-        if (configuration == null) {
-            configuration = new Configuration();
-            configuration.setHostLanguage(Configuration.XQUERY);
-        }
+    public Configuration getConfiguration() {
         return configuration;
     }
 
     public void setConfiguration(Configuration configuration) {
         this.configuration = configuration;
+        // change configuration, we must re intialize
+        initialized.set(false);
     }
 
-    public synchronized StaticQueryContext getStaticQueryContext() {
-        if (staticQueryContext == null) {
-            staticQueryContext = new StaticQueryContext(getConfiguration());
-            Set<Map.Entry<String, String>> entries = 
namespacePrefixes.entrySet();
-            for (Map.Entry<String, String> entry : entries) {
-                String prefix = entry.getKey();
-                String uri = entry.getValue();
-                staticQueryContext.declareNamespace(prefix, uri);
-                staticQueryContext.setInheritNamespaces(true);
-            }
-        }
+    public StaticQueryContext getStaticQueryContext() {
         return staticQueryContext;
     }
 
     public void setStaticQueryContext(StaticQueryContext staticQueryContext) {
         this.staticQueryContext = staticQueryContext;
+        // change context, we must re intialize
+        initialized.set(false);
     }
 
     public Map<String, Object> getParameters() {
@@ -367,7 +377,8 @@
     /**
      * A factory method to create the XQuery expression
      */
-    protected abstract XQueryExpression 
createQueryExpression(StaticQueryContext staticQueryContext) throws 
XPathException, IOException;
+    protected abstract XQueryExpression 
createQueryExpression(StaticQueryContext staticQueryContext)
+        throws XPathException, IOException;
 
     /**
      * Creates a dynamic context for the given exchange
@@ -404,7 +415,8 @@
      * @param exchange
      * @throws Exception
      */
-    protected void configureQuery(DynamicQueryContext dynamicQueryContext, 
Exchange exchange) throws Exception {
+    protected void configureQuery(DynamicQueryContext dynamicQueryContext, 
Exchange exchange)
+        throws Exception {
         addParameters(dynamicQueryContext, exchange.getProperties());
         addParameters(dynamicQueryContext, exchange.getIn().getHeaders());
         addParameters(dynamicQueryContext, getParameters());
@@ -422,17 +434,35 @@
         }
     }
 
+    protected boolean matches(Exchange exchange, List results) {
+        return ObjectHelper.matches(results);
+    }
+
     /**
-     * To avoid keeping around any unnecessary objects after the expression has
-     * been created lets nullify references here
+     * Initializes this builder - <b>Must be invoked before evaluation</b>.
      */
-    protected void clearBuilderReferences() {
-        staticQueryContext = null;
-        configuration = null;
-    }
+    protected synchronized void initialize() throws XPathException, 
IOException {
+        // must use synchronized for concurrency issues and only let it 
intialize once
+        if (!initialized.get()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Initializing XQueryBuilder " + this);
+            }
+            configuration = new Configuration();
+            configuration.setHostLanguage(Configuration.XQUERY);
 
-    protected boolean matches(Exchange exchange, List results) {
-        return ObjectHelper.matches(results);
+            staticQueryContext = new StaticQueryContext(getConfiguration());
+            Set<Map.Entry<String, String>> entries = 
namespacePrefixes.entrySet();
+            for (Map.Entry<String, String> entry : entries) {
+                String prefix = entry.getKey();
+                String uri = entry.getValue();
+                staticQueryContext.declareNamespace(prefix, uri);
+                staticQueryContext.setInheritNamespaces(true);
+            }
+
+            expression = createQueryExpression(staticQueryContext);
+
+            initialized.set(true);
+        }
     }
 
 }

Modified: 
activemq/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java?rev=676550&r1=676549&r2=676550&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java
 (original)
+++ 
activemq/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java
 Mon Jul 14 04:16:00 2008
@@ -30,7 +30,7 @@
 public class XQueryConcurrencyTest extends ContextTestSupport {
 
     public void testConcurrency() throws Exception {
-        int total = 100;
+        int total = 1000;
 
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(total);
@@ -43,11 +43,11 @@
             final int threadCount = i;
             executor.execute(new Runnable() {
                 public void run() {
-                    int start = threadCount * 20;
-                    for (int i = 0; i < 20; i++) {
+                    int start = threadCount * 200;
+                    for (int i = 0; i < 200; i++) {
                         try {
                             // do some random sleep to simulate spread in user 
activity
-                            Thread.sleep(new Random().nextInt(100));
+                            Thread.sleep(new Random().nextInt(10));
                         } catch (InterruptedException e) {
                             // ignore
                         }
@@ -57,19 +57,18 @@
             });
         }
 
-        mock.assertIsSatisfied();
         mock.assertNoDuplicates(body());
+        mock.assertIsSatisfied();
     }
 
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                // only retry at max 2 times to cather
-                // if set to 0 we can get interal Saxon errors - SENR0001
-                errorHandler(new 
DeadLetterChannelBuilder().maximumRedeliveries(1));
+                // no retry as we want every failure to submerge
+                errorHandler(noErrorHandler());
 
                 from("seda:in")
-                    .thread(10)
+                    .thread(5)
                     .transform().xquery("/person/id", String.class)
                     .to("mock:result");
             }

Modified: 
activemq/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryURLBasedConcurrencyTest.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryURLBasedConcurrencyTest.java?rev=676550&r1=676549&r2=676550&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryURLBasedConcurrencyTest.java
 (original)
+++ 
activemq/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryURLBasedConcurrencyTest.java
 Mon Jul 14 04:16:00 2008
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.xquery;
 
+import java.util.Random;
+
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.DeadLetterChannelBuilder;
 import org.apache.camel.builder.RouteBuilder;
@@ -27,10 +29,8 @@
  */
 public class XQueryURLBasedConcurrencyTest extends ContextTestSupport {
 
-    // TODO: Work in progress
-
     public void testConcurrency() throws Exception {
-        int total = 1;
+        int total = 100;
 
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(total);
@@ -39,32 +39,40 @@
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
         executor.setCorePoolSize(5);
         executor.afterPropertiesSet();
-        for (int i = 0; i < 1; i++) {
+        for (int i = 0; i < 5; i++) {
             final int threadCount = i;
             executor.execute(new Runnable() {
                 public void run() {
                     int start = threadCount * 20;
-                    for (int i = 0; i < 1; i++) {
-                        template.sendBody("seda:in",
-                            "<mail><subject>" + (start + i) + 
"</subject><body>Hello world!</body></mail>");
+                    for (int i = 0; i < 20; i++) {
+                        try {
+                            // do some random sleep to simulate spread in user 
activity
+                            Thread.sleep(new Random().nextInt(10));
+                        } catch (InterruptedException e) {
+                            // ignore
+                        }
+                        template.sendBody("direct:start",
+                            "<mail><subject>Hey</subject><body>Hello 
world!</body></mail>");
                     }
                 }
             });
         }
 
-        mock.assertIsSatisfied();
         mock.assertNoDuplicates(body());
+        mock.assertIsSatisfied();
+
+        System.out.println("The End");
+        System.out.println(mock.getExchanges().get(0).getIn().getBody());
     }
 
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                // only retry at max 2 times to cather
-                // if set to 0 we can get interal Saxon errors - SENR0001
-                errorHandler(new 
DeadLetterChannelBuilder().maximumRedeliveries(2));
+                // no retry as we want every failure to submerge
+                errorHandler(noErrorHandler());
 
-                from("seda:in")
-                    .thread(10)
+                from("direct:start")
+                    .thread(5)
                     
.to("xquery:org/apache/camel/component/xquery/transform.xquery")
                     .to("mock:result");
             }


Reply via email to