Author: davsclaus
Date: Tue Feb 23 17:00:02 2010
New Revision: 915431

URL: http://svn.apache.org/viewvc?rev=915431&view=rev
Log:
CAMEL-217: Persistent aggregator. Polished code a bit.

Modified:
    
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
    
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java
    
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java

Modified: 
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java?rev=915431&r1=915430&r2=915431&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
 (original)
+++ 
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
 Tue Feb 23 17:00:02 2010
@@ -59,9 +59,10 @@
             if (rc == null) {
                 return null;
             }
+            // TODO: We can improve performance by not returning the old when 
adding
             return unmarshallExchange(rc);
         } catch (IOException e) {
-            throw new RuntimeException(e);
+            throw new RuntimeException("Error adding to repository " + name + 
" with key " + key, e);
         }
     }
 
@@ -80,7 +81,7 @@
             }
             return unmarshallExchange(rc);
         } catch (IOException e) {
-            throw new RuntimeException(e);
+            throw new RuntimeException("Error getting key " + key + " from 
repository " + name, e);
         }
     }
 
@@ -94,7 +95,7 @@
                 }
             });
         } catch (IOException e) {
-            throw new RuntimeException(e);
+            throw new RuntimeException("Error removing key " + key + " from 
repository " + name, e);
         }
     }
 
@@ -106,7 +107,9 @@
 
     protected Buffer marshallExchange(Exchange exchange) throws IOException {
         DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
+        // use DefaultExchangeHolder to marshal to a serialized object
         DefaultExchangeHolder pe = DefaultExchangeHolder.marshal(exchange, 
false);
+        // TODO: store aggregation size 
         exchangeMarshaller.writePayload(pe, baos);
         return baos.toBuffer();
     }

Modified: 
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java?rev=915431&r1=915430&r2=915431&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java
 (original)
+++ 
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java
 Tue Feb 23 17:00:02 2010
@@ -36,7 +36,9 @@
 
     private static final transient Log LOG = 
LogFactory.getLog(HawtDBFile.class);
 
-    private final static BTreeIndexFactory<String, Integer> indexesFactory = 
new BTreeIndexFactory<String, Integer>();
+    // the root which contains an index with name -> page for the real indexes
+    private final static BTreeIndexFactory<String, Integer> rootIndexesFactory 
= new BTreeIndexFactory<String, Integer>();
+    // the real indexes where we store persisted data in buffers
     private final static BTreeIndexFactory<Buffer, Buffer> indexFactory = new 
BTreeIndexFactory<Buffer, Buffer>();
 
     public HawtDBFile() {
@@ -44,9 +46,9 @@
     }
 
     static {
-        indexesFactory.setKeyMarshaller(StringMarshaller.INSTANCE);
-        indexesFactory.setValueMarshaller(IntegerMarshaller.INSTANCE);
-        indexesFactory.setDeferredEncoding(true);
+        rootIndexesFactory.setKeyMarshaller(StringMarshaller.INSTANCE);
+        rootIndexesFactory.setValueMarshaller(IntegerMarshaller.INSTANCE);
+        rootIndexesFactory.setDeferredEncoding(true);
         indexFactory.setKeyMarshaller(VariableBufferMarshaller.INSTANCE);
         indexFactory.setValueMarshaller(VariableBufferMarshaller.INSTANCE);
         indexFactory.setDeferredEncoding(true);
@@ -69,10 +71,10 @@
                     int page = tx.allocator().alloc(1);
                     // if we just created the file, first allocated page 
should be 0
                     assert page == 0;
-                    indexesFactory.create(tx, 0);
+                    rootIndexesFactory.create(tx, 0);
                     LOG.info("Aggregation repository data store created using 
file: " + getFile());
                 } else {
-                    Index<String, Integer> indexes = indexesFactory.open(tx, 
0);
+                    Index<String, Integer> indexes = 
rootIndexesFactory.open(tx, 0);
                     LOG.info("Aggregation repository data store loaded using 
file: " + getFile()
                             + " containing " + indexes.size() + " 
repositories.");
                 }
@@ -91,20 +93,28 @@
     }
 
     public <T> T execute(Work<T> work) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Executing work " + work);
+        }
+
         Transaction tx = pageFile.tx();
         try {
             T rc = work.execute(tx);
             tx.commit();
             return rc;
         } catch (RuntimeException e) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Error executing work " + work + " will do 
rollback", e);
+            }
             tx.rollback();
             throw e;
         }
     }
 
     public Index<Buffer, Buffer> getRepositoryIndex(Transaction tx, String 
name) {
-        Index<String, Integer> indexes = indexesFactory.open(tx, 0);
+        Index<String, Integer> indexes = rootIndexesFactory.open(tx, 0);
         Integer location = indexes.get(name);
+
         if (location == null) {
             // create it..
             int page = tx.allocator().alloc(1);
@@ -113,8 +123,15 @@
             // add it to indexes so we can find it the next time
             indexes.put(name, page);
 
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Created new repository index with name " + name + " 
at location " + page);
+            }
+
             return created;
         } else {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Repository index with name " + name + " at location 
" + location);
+            }
             return indexFactory.open(tx, location);
         }
     }

Modified: 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java?rev=915431&r1=915430&r2=915431&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java
 (original)
+++ 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java
 Tue Feb 23 17:00:02 2010
@@ -23,9 +23,6 @@
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 
-/**
- * Tests the HawtDBAggregationRepository implementation.
- */
 public class HawtDBAggregationRepositoryTest extends CamelTestSupport {
 
     private HawtDBFile hawtDBFile;


Reply via email to