Author: davsclaus
Date: Sun Sep 28 01:51:40 2008
New Revision: 699788
URL: http://svn.apache.org/viewvc?rev=699788&view=rev
Log:
CAMEL-126: Applied patch no 3 with thanks to Martin Krasser
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java?rev=699788&r1=699787&r2=699788&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
Sun Sep 28 01:51:40 2008
@@ -34,6 +34,7 @@
import org.apache.camel.model.language.ExpressionType;
import org.apache.camel.processor.Resequencer;
import org.apache.camel.processor.StreamResequencer;
+import org.apache.camel.processor.resequencer.ExpressionResultComparator;
import org.apache.camel.spi.RouteContext;
/**
@@ -163,6 +164,41 @@
stream(streamConfig);
}
+ public ResequencerType timeout(long timeout) {
+ if (batchConfig != null) {
+ batchConfig.setBatchTimeout(timeout);
+ } else {
+ streamConfig.setTimeout(timeout);
+ }
+ return this;
+ }
+
+ public ResequencerType size(int batchSize) {
+ if (batchConfig == null) {
+ throw new IllegalStateException("size() only supported for batch
resequencer");
+ }
+ batchConfig.setBatchSize(batchSize);
+ return this;
+ }
+
+ public ResequencerType capacity(int capacity) {
+ if (streamConfig == null) {
+ throw new IllegalStateException("capacity() only supported for
stream resequencer");
+ }
+ streamConfig.setCapacity(capacity);
+ return this;
+
+ }
+
+ public ResequencerType comparator(ExpressionResultComparator<Exchange>
comparator) {
+ if (streamConfig == null) {
+ throw new IllegalStateException("comparator() only supported for
stream resequencer");
+ }
+ streamConfig.setComparator(comparator);
+ return this;
+
+ }
+
@Override
public Processor createProcessor(RouteContext routeContext) throws
Exception {
if (batchConfig != null) {
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=699788&r1=699787&r2=699788&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
Sun Sep 28 01:51:40 2008
@@ -200,15 +200,7 @@
}
public void process(Exchange exchange) throws Exception {
- if (engine.size() >= capacity) {
- Thread.sleep(getTimeout());
- } else {
- if (exchange != null) {
- engine.insert(exchange);
- }
- }
- engine.deliver();
-
+ // empty since exchanges come from endpoint's polling consumer
}
}
\ No newline at end of file