Author: davsclaus
Date: Sun Sep 28 01:51:40 2008
New Revision: 699788

URL: http://svn.apache.org/viewvc?rev=699788&view=rev
Log:
CAMEL-126: Applied patch no 3 with thanks to Martin Krasser

Modified:
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java?rev=699788&r1=699787&r2=699788&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
 Sun Sep 28 01:51:40 2008
@@ -34,6 +34,7 @@
 import org.apache.camel.model.language.ExpressionType;
 import org.apache.camel.processor.Resequencer;
 import org.apache.camel.processor.StreamResequencer;
+import org.apache.camel.processor.resequencer.ExpressionResultComparator;
 import org.apache.camel.spi.RouteContext;
 
 /**
@@ -163,6 +164,41 @@
         stream(streamConfig);
     }
     
+    public ResequencerType timeout(long timeout) {
+        if (batchConfig != null) {
+            batchConfig.setBatchTimeout(timeout);
+        } else {
+            streamConfig.setTimeout(timeout);
+        }
+        return this;
+    }
+    
+    public ResequencerType size(int batchSize) {
+        if (batchConfig == null) {
+            throw new IllegalStateException("size() only supported for batch 
resequencer");
+        }
+        batchConfig.setBatchSize(batchSize);
+        return this;
+    }
+
+    public ResequencerType capacity(int capacity) {
+        if (streamConfig == null) {
+            throw new IllegalStateException("capacity() only supported for 
stream resequencer");
+        }
+        streamConfig.setCapacity(capacity);
+        return this;
+        
+    }
+    
+    public ResequencerType comparator(ExpressionResultComparator<Exchange> 
comparator) {
+        if (streamConfig == null) {
+            throw new IllegalStateException("comparator() only supported for 
stream resequencer");
+        }
+        streamConfig.setComparator(comparator);
+        return this;
+        
+    }
+    
     @Override
     public Processor createProcessor(RouteContext routeContext) throws 
Exception {
         if (batchConfig != null) {

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=699788&r1=699787&r2=699788&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
 Sun Sep 28 01:51:40 2008
@@ -200,15 +200,7 @@
     }
 
     public void process(Exchange exchange) throws Exception {
-        if (engine.size() >= capacity) {
-            Thread.sleep(getTimeout());
-        } else {
-            if (exchange != null) {
-                engine.insert(exchange);
-            }
-        }
-        engine.deliver();
-        
+        // empty since exchanges come from endpoint's polling consumer
     }
 
 }
\ No newline at end of file


Reply via email to