Updated Branches:
  refs/heads/camel-2.11.x 1e1578743 -> b62aabee3

CAMEL-6507 Add aggregate ability to camel-mongodb, with thanks to Pierre-Alban.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3ca6b2f9
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3ca6b2f9
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3ca6b2f9

Branch: refs/heads/camel-2.11.x
Commit: 3ca6b2f9fe40053a8818808301195899110ca9d4
Parents: 1e15787
Author: Willem Jiang <ningji...@apache.org>
Authored: Thu Jul 4 10:31:49 2013 +0800
Committer: Raúl Kripalani <ra...@apache.org>
Committed: Sun Jul 7 23:33:33 2013 +0100

----------------------------------------------------------------------
 .../component/mongodb/MongoDbOperation.java     |  3 ++
 .../component/mongodb/MongoDbProducer.java      | 44 +++++++++++++++++++-
 .../mongodb/MongoDbOperationsTest.java          | 20 +++++++++
 3 files changed, 66 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3ca6b2f9/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java
 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java
index 8d11fde..bb6ee6a 100644
--- 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java
+++ 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java
@@ -33,6 +33,9 @@ public enum MongoDbOperation {
     // delete operations
     remove, 
     
+    //aggregat
+    aggregate,
+    
     // others
     getDbStats, 
     getColStats, 

http://git-wip-us.apache.org/repos/asf/camel/blob/3ca6b2f9/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
index ea04abe..166c62b 100644
--- 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
+++ 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
@@ -19,6 +19,8 @@ package org.apache.camel.component.mongodb;
 import java.util.ArrayList;
 import java.util.List;
 
+import com.mongodb.AggregationOutput;
+import com.mongodb.BasicDBList;
 import com.mongodb.BasicDBObject;
 import com.mongodb.CommandResult;
 import com.mongodb.DB;
@@ -113,7 +115,11 @@ public class MongoDbProducer extends DefaultProducer {
         case remove:
             doRemove(exchange);
             break;
-
+        
+        case aggregate:
+            doAggregate(exchange);
+            break;
+        
         case getDbStats:
             doGetStats(exchange, MongoDbOperation.getDbStats);
             break;
@@ -339,6 +345,42 @@ public class MongoDbProducer extends DefaultProducer {
         resultMessage.setBody(answer);
     }
     
+    /**
+    * All headers except collection and database are non available for this
+    * operation.
+    * 
+    * @param exchange
+    * @throws Exception
+    */
+    protected void doAggregate(Exchange exchange) throws Exception {
+        DBCollection dbCol = calculateCollection(exchange);
+        DBObject query = exchange.getIn().getMandatoryBody(DBObject.class);
+
+        // Impossible with java driver to get the batch size and number to skip
+        Iterable<DBObject> dbIterator = null;
+        try {
+            AggregationOutput aggregationResult = null;
+
+            // Allow body to be a pipeline
+            // @see http://docs.mongodb.org/manual/core/aggregation/
+            if (query instanceof BasicDBList) {
+                BasicDBList queryList = (BasicDBList)query;
+                aggregationResult = 
dbCol.aggregate((DBObject)queryList.get(0), (BasicDBObject[])queryList
+                    .subList(1, queryList.size()).toArray(new 
BasicDBObject[queryList.size() - 1]));
+            } else {
+                aggregationResult = dbCol.aggregate(query);
+            }
+
+            dbIterator = aggregationResult.results();
+            Message resultMessage = prepareResponseMessage(exchange, 
MongoDbOperation.aggregate);
+            resultMessage.setBody(dbIterator);
+
+            // Mongo Driver does not allow to read size and to paginate 
aggregate result
+        } catch (Exception e) {
+            // rethrow the exception
+            throw e;
+        }
+    }
     // --------- Convenience methods -----------------------
     
     private DBCollection calculateCollection(Exchange exchange) {

http://git-wip-us.apache.org/repos/asf/camel/blob/3ca6b2f9/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java
 
b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java
index 32be1a3..3176abe 100644
--- 
a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java
+++ 
b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.mongodb;
 
 import java.util.Formatter;
+import java.util.List;
 
 import com.mongodb.BasicDBObject;
 import com.mongodb.DBObject;
@@ -157,6 +158,24 @@ public class MongoDbOperationsTest extends 
AbstractMongoDbTest {
     }
     
     @Test
+    public void testAggregate() throws Exception {
+        // Test that the collection has 0 documents in it
+        assertEquals(0, testCollection.count());
+        pumpDataIntoTestCollection();
+
+        // Repeat ten times, obtain 10 batches of 100 results each time
+        Object result = template
+            .requestBody("direct:aggregate",
+                         "[{ $match : {$or : [{\"scientist\" : 
\"Darwin\"},{\"scientist\" : \"Einstein\"}]}},{ $group: { _id: \"$scientist\", 
count: { $sum: 1 }} } ]");
+        assertTrue("Result is not of type List", result instanceof List);
+
+        @SuppressWarnings("unchecked")
+        List<DBObject> resultList = (List<DBObject>)result;
+        assertListSize("Result does not contain 2 elements", resultList, 2);
+        // TODO Add more asserts
+    }
+    
+    @Test
     public void testDbStats() throws Exception {
         assertEquals(0, testCollection.count());
         Object result = template.requestBody("direct:getDbStats", 
"irrelevantBody");
@@ -210,6 +229,7 @@ public class MongoDbOperationsTest extends 
AbstractMongoDbTest {
                 
from("direct:save").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=save&writeConcern=SAFE");
                 
from("direct:update").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=update&writeConcern=SAFE");
                 
from("direct:remove").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=remove&writeConcern=SAFE");
+                
from("direct:aggregate").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=aggregate&writeConcern=SAFE");
                 
from("direct:getDbStats").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=getDbStats");
                 
from("direct:getColStats").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=getColStats");
 

Reply via email to