Author: davsclaus
Date: Mon Jul 14 04:16:00 2008
New Revision: 676550
URL: http://svn.apache.org/viewvc?rev=676550&view=rev
Log:
CAMEL-702: A proposed fix for CAMEL-702. Gert will take a look too. Concurrency
issue with camel-saon. WORK IN PROGRESS!!
Modified:
activemq/camel/trunk/components/camel-saxon/src/main/java/org/apache/camel/component/xquery/XQueryBuilder.java
activemq/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java
activemq/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryURLBasedConcurrencyTest.java
Modified:
activemq/camel/trunk/components/camel-saxon/src/main/java/org/apache/camel/component/xquery/XQueryBuilder.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-saxon/src/main/java/org/apache/camel/component/xquery/XQueryBuilder.java?rev=676550&r1=676549&r2=676550&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-saxon/src/main/java/org/apache/camel/component/xquery/XQueryBuilder.java
(original)
+++
activemq/camel/trunk/components/camel-saxon/src/main/java/org/apache/camel/component/xquery/XQueryBuilder.java
Mon Jul 14 04:16:00 2008
@@ -30,6 +30,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.xml.transform.Result;
import javax.xml.transform.Source;
import javax.xml.transform.dom.DOMResult;
@@ -78,6 +79,7 @@
private ResultFormat resultsFormat = ResultFormat.DOM;
private Properties properties = new Properties();
private Class resultType;
+ private final AtomicBoolean initialized = new AtomicBoolean(false);
@Override
public String toString() {
@@ -93,10 +95,13 @@
}
public Object evaluate(Exchange exchange) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Evaluation " + expression + " for exchange: " +
exchange);
- }
try {
+ initialize();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Evaluation " + expression + " for exchange: " +
exchange);
+ }
+
if (resultType != null) {
if (resultType.equals(String.class)) {
return evaluateAsString(exchange);
@@ -128,28 +133,29 @@
}
}
- /**
- * Configures the namespace context from the given DOM element
- */
- public void setNamespaces(Map<String, String> namespaces) {
- namespacePrefixes.putAll(namespaces);
- }
-
public List evaluateAsList(Exchange exchange) throws Exception {
+ initialize();
+
return getExpression().evaluate(createDynamicContext(exchange));
}
public Object evaluateAsStringSource(Exchange exchange) throws Exception {
+ initialize();
+
String text = evaluateAsString(exchange);
return new StringSource(text);
}
public Object evaluateAsBytesSource(Exchange exchange) throws Exception {
+ initialize();
+
byte[] bytes = evaluateAsBytes(exchange);
return new BytesSource(bytes);
}
public Node evaluateAsDOM(Exchange exchange) throws Exception {
+ initialize();
+
DOMResult result = new DOMResult();
DynamicQueryContext context = createDynamicContext(exchange);
XQueryExpression expression = getExpression();
@@ -158,6 +164,8 @@
}
public byte[] evaluateAsBytes(Exchange exchange) throws Exception {
+ initialize();
+
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
Result result = new StreamResult(buffer);
getExpression().pull(createDynamicContext(exchange), result,
properties);
@@ -166,6 +174,8 @@
}
public String evaluateAsString(Exchange exchange) throws Exception {
+ initialize();
+
StringWriter buffer = new StringWriter();
SequenceIterator iter =
getExpression().iterator(createDynamicContext(exchange));
for (Item item = iter.next(); item != null; item = iter.next()) {
@@ -198,7 +208,8 @@
//-------------------------------------------------------------------------
public static XQueryBuilder xquery(final String queryText) {
return new XQueryBuilder() {
- protected XQueryExpression
createQueryExpression(StaticQueryContext staticQueryContext) throws
XPathException {
+ protected XQueryExpression
createQueryExpression(StaticQueryContext staticQueryContext)
+ throws XPathException {
return staticQueryContext.compileQuery(queryText);
}
};
@@ -206,7 +217,8 @@
public static XQueryBuilder xquery(final Reader reader) {
return new XQueryBuilder() {
- protected XQueryExpression
createQueryExpression(StaticQueryContext staticQueryContext) throws
XPathException, IOException {
+ protected XQueryExpression
createQueryExpression(StaticQueryContext staticQueryContext)
+ throws XPathException, IOException {
return staticQueryContext.compileQuery(reader);
}
};
@@ -214,7 +226,8 @@
public static XQueryBuilder xquery(final InputStream in, final String
characterSet) {
return new XQueryBuilder() {
- protected XQueryExpression
createQueryExpression(StaticQueryContext staticQueryContext) throws
XPathException, IOException {
+ protected XQueryExpression
createQueryExpression(StaticQueryContext staticQueryContext)
+ throws XPathException, IOException {
return staticQueryContext.compileQuery(in, characterSet);
}
};
@@ -245,6 +258,8 @@
public XQueryBuilder namespace(String prefix, String uri) {
namespacePrefixes.put(prefix, uri);
+ // more namespace, we must re initialize
+ initialized.set(false);
return this;
}
@@ -291,42 +306,37 @@
// Properties
//
-------------------------------------------------------------------------
- public synchronized XQueryExpression getExpression() throws IOException,
XPathException {
- if (expression == null) {
- expression = createQueryExpression(getStaticQueryContext());
- clearBuilderReferences();
- }
+ /**
+ * Configures the namespace context from the given DOM element
+ */
+ public void setNamespaces(Map<String, String> namespaces) {
+ namespacePrefixes.putAll(namespaces);
+ // more namespace, we must re initialize
+ initialized.set(false);
+ }
+
+ public XQueryExpression getExpression() throws IOException, XPathException
{
return expression;
}
- public synchronized Configuration getConfiguration() {
- if (configuration == null) {
- configuration = new Configuration();
- configuration.setHostLanguage(Configuration.XQUERY);
- }
+ public Configuration getConfiguration() {
return configuration;
}
public void setConfiguration(Configuration configuration) {
this.configuration = configuration;
+ // change configuration, we must re intialize
+ initialized.set(false);
}
- public synchronized StaticQueryContext getStaticQueryContext() {
- if (staticQueryContext == null) {
- staticQueryContext = new StaticQueryContext(getConfiguration());
- Set<Map.Entry<String, String>> entries =
namespacePrefixes.entrySet();
- for (Map.Entry<String, String> entry : entries) {
- String prefix = entry.getKey();
- String uri = entry.getValue();
- staticQueryContext.declareNamespace(prefix, uri);
- staticQueryContext.setInheritNamespaces(true);
- }
- }
+ public StaticQueryContext getStaticQueryContext() {
return staticQueryContext;
}
public void setStaticQueryContext(StaticQueryContext staticQueryContext) {
this.staticQueryContext = staticQueryContext;
+ // change context, we must re intialize
+ initialized.set(false);
}
public Map<String, Object> getParameters() {
@@ -367,7 +377,8 @@
/**
* A factory method to create the XQuery expression
*/
- protected abstract XQueryExpression
createQueryExpression(StaticQueryContext staticQueryContext) throws
XPathException, IOException;
+ protected abstract XQueryExpression
createQueryExpression(StaticQueryContext staticQueryContext)
+ throws XPathException, IOException;
/**
* Creates a dynamic context for the given exchange
@@ -404,7 +415,8 @@
* @param exchange
* @throws Exception
*/
- protected void configureQuery(DynamicQueryContext dynamicQueryContext,
Exchange exchange) throws Exception {
+ protected void configureQuery(DynamicQueryContext dynamicQueryContext,
Exchange exchange)
+ throws Exception {
addParameters(dynamicQueryContext, exchange.getProperties());
addParameters(dynamicQueryContext, exchange.getIn().getHeaders());
addParameters(dynamicQueryContext, getParameters());
@@ -422,17 +434,35 @@
}
}
+ protected boolean matches(Exchange exchange, List results) {
+ return ObjectHelper.matches(results);
+ }
+
/**
- * To avoid keeping around any unnecessary objects after the expression has
- * been created lets nullify references here
+ * Initializes this builder - <b>Must be invoked before evaluation</b>.
*/
- protected void clearBuilderReferences() {
- staticQueryContext = null;
- configuration = null;
- }
+ protected synchronized void initialize() throws XPathException,
IOException {
+ // must use synchronized for concurrency issues and only let it
intialize once
+ if (!initialized.get()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initializing XQueryBuilder " + this);
+ }
+ configuration = new Configuration();
+ configuration.setHostLanguage(Configuration.XQUERY);
- protected boolean matches(Exchange exchange, List results) {
- return ObjectHelper.matches(results);
+ staticQueryContext = new StaticQueryContext(getConfiguration());
+ Set<Map.Entry<String, String>> entries =
namespacePrefixes.entrySet();
+ for (Map.Entry<String, String> entry : entries) {
+ String prefix = entry.getKey();
+ String uri = entry.getValue();
+ staticQueryContext.declareNamespace(prefix, uri);
+ staticQueryContext.setInheritNamespaces(true);
+ }
+
+ expression = createQueryExpression(staticQueryContext);
+
+ initialized.set(true);
+ }
}
}
Modified:
activemq/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java?rev=676550&r1=676549&r2=676550&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java
(original)
+++
activemq/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java
Mon Jul 14 04:16:00 2008
@@ -30,7 +30,7 @@
public class XQueryConcurrencyTest extends ContextTestSupport {
public void testConcurrency() throws Exception {
- int total = 100;
+ int total = 1000;
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(total);
@@ -43,11 +43,11 @@
final int threadCount = i;
executor.execute(new Runnable() {
public void run() {
- int start = threadCount * 20;
- for (int i = 0; i < 20; i++) {
+ int start = threadCount * 200;
+ for (int i = 0; i < 200; i++) {
try {
// do some random sleep to simulate spread in user
activity
- Thread.sleep(new Random().nextInt(100));
+ Thread.sleep(new Random().nextInt(10));
} catch (InterruptedException e) {
// ignore
}
@@ -57,19 +57,18 @@
});
}
- mock.assertIsSatisfied();
mock.assertNoDuplicates(body());
+ mock.assertIsSatisfied();
}
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
- // only retry at max 2 times to cather
- // if set to 0 we can get interal Saxon errors - SENR0001
- errorHandler(new
DeadLetterChannelBuilder().maximumRedeliveries(1));
+ // no retry as we want every failure to submerge
+ errorHandler(noErrorHandler());
from("seda:in")
- .thread(10)
+ .thread(5)
.transform().xquery("/person/id", String.class)
.to("mock:result");
}
Modified:
activemq/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryURLBasedConcurrencyTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryURLBasedConcurrencyTest.java?rev=676550&r1=676549&r2=676550&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryURLBasedConcurrencyTest.java
(original)
+++
activemq/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryURLBasedConcurrencyTest.java
Mon Jul 14 04:16:00 2008
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.xquery;
+import java.util.Random;
+
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.DeadLetterChannelBuilder;
import org.apache.camel.builder.RouteBuilder;
@@ -27,10 +29,8 @@
*/
public class XQueryURLBasedConcurrencyTest extends ContextTestSupport {
- // TODO: Work in progress
-
public void testConcurrency() throws Exception {
- int total = 1;
+ int total = 100;
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(total);
@@ -39,32 +39,40 @@
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.afterPropertiesSet();
- for (int i = 0; i < 1; i++) {
+ for (int i = 0; i < 5; i++) {
final int threadCount = i;
executor.execute(new Runnable() {
public void run() {
int start = threadCount * 20;
- for (int i = 0; i < 1; i++) {
- template.sendBody("seda:in",
- "<mail><subject>" + (start + i) +
"</subject><body>Hello world!</body></mail>");
+ for (int i = 0; i < 20; i++) {
+ try {
+ // do some random sleep to simulate spread in user
activity
+ Thread.sleep(new Random().nextInt(10));
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ template.sendBody("direct:start",
+ "<mail><subject>Hey</subject><body>Hello
world!</body></mail>");
}
}
});
}
- mock.assertIsSatisfied();
mock.assertNoDuplicates(body());
+ mock.assertIsSatisfied();
+
+ System.out.println("The End");
+ System.out.println(mock.getExchanges().get(0).getIn().getBody());
}
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
- // only retry at max 2 times to cather
- // if set to 0 we can get interal Saxon errors - SENR0001
- errorHandler(new
DeadLetterChannelBuilder().maximumRedeliveries(2));
+ // no retry as we want every failure to submerge
+ errorHandler(noErrorHandler());
- from("seda:in")
- .thread(10)
+ from("direct:start")
+ .thread(5)
.to("xquery:org/apache/camel/component/xquery/transform.xquery")
.to("mock:result");
}