Author: davsclaus Date: Tue Feb 23 17:00:02 2010 New Revision: 915431 URL: http://svn.apache.org/viewvc?rev=915431&view=rev Log: CAMEL-217: Persistent aggregator. Polished code a bit.
Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java?rev=915431&r1=915430&r2=915431&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java (original) +++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java Tue Feb 23 17:00:02 2010 @@ -59,9 +59,10 @@ if (rc == null) { return null; } + // TODO: We can improve performance by not returning the old when adding return unmarshallExchange(rc); } catch (IOException e) { - throw new RuntimeException(e); + throw new RuntimeException("Error adding to repository " + name + " with key " + key, e); } } @@ -80,7 +81,7 @@ } return unmarshallExchange(rc); } catch (IOException e) { - throw new RuntimeException(e); + throw new RuntimeException("Error getting key " + key + " from repository " + name, e); } } @@ -94,7 +95,7 @@ } }); } catch (IOException e) { - throw new RuntimeException(e); + throw new RuntimeException("Error removing key " + key + " from repository " + name, e); } } @@ -106,7 +107,9 @@ protected Buffer marshallExchange(Exchange exchange) throws IOException { DataByteArrayOutputStream baos = new DataByteArrayOutputStream(); + // use DefaultExchangeHolder to marshal to a serialized object DefaultExchangeHolder pe = DefaultExchangeHolder.marshal(exchange, false); + // TODO: store aggregation size exchangeMarshaller.writePayload(pe, baos); return baos.toBuffer(); } Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java?rev=915431&r1=915430&r2=915431&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java (original) +++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java Tue Feb 23 17:00:02 2010 @@ -36,7 +36,9 @@ private static final transient Log LOG = LogFactory.getLog(HawtDBFile.class); - private final static BTreeIndexFactory<String, Integer> indexesFactory = new BTreeIndexFactory<String, Integer>(); + // the root which contains an index with name -> page for the real indexes + private final static BTreeIndexFactory<String, Integer> rootIndexesFactory = new BTreeIndexFactory<String, Integer>(); + // the real indexes where we store persisted data in buffers private final static BTreeIndexFactory<Buffer, Buffer> indexFactory = new BTreeIndexFactory<Buffer, Buffer>(); public HawtDBFile() { @@ -44,9 +46,9 @@ } static { - indexesFactory.setKeyMarshaller(StringMarshaller.INSTANCE); - indexesFactory.setValueMarshaller(IntegerMarshaller.INSTANCE); - indexesFactory.setDeferredEncoding(true); + rootIndexesFactory.setKeyMarshaller(StringMarshaller.INSTANCE); + rootIndexesFactory.setValueMarshaller(IntegerMarshaller.INSTANCE); + rootIndexesFactory.setDeferredEncoding(true); indexFactory.setKeyMarshaller(VariableBufferMarshaller.INSTANCE); indexFactory.setValueMarshaller(VariableBufferMarshaller.INSTANCE); indexFactory.setDeferredEncoding(true); @@ -69,10 +71,10 @@ int page = tx.allocator().alloc(1); // if we just created the file, first allocated page should be 0 assert page == 0; - indexesFactory.create(tx, 0); + rootIndexesFactory.create(tx, 0); LOG.info("Aggregation repository data store created using file: " + getFile()); } else { - Index<String, Integer> indexes = indexesFactory.open(tx, 0); + Index<String, Integer> indexes = rootIndexesFactory.open(tx, 0); LOG.info("Aggregation repository data store loaded using file: " + getFile() + " containing " + indexes.size() + " repositories."); } @@ -91,20 +93,28 @@ } public <T> T execute(Work<T> work) { + if (LOG.isTraceEnabled()) { + LOG.trace("Executing work " + work); + } + Transaction tx = pageFile.tx(); try { T rc = work.execute(tx); tx.commit(); return rc; } catch (RuntimeException e) { + if (LOG.isTraceEnabled()) { + LOG.trace("Error executing work " + work + " will do rollback", e); + } tx.rollback(); throw e; } } public Index<Buffer, Buffer> getRepositoryIndex(Transaction tx, String name) { - Index<String, Integer> indexes = indexesFactory.open(tx, 0); + Index<String, Integer> indexes = rootIndexesFactory.open(tx, 0); Integer location = indexes.get(name); + if (location == null) { // create it.. int page = tx.allocator().alloc(1); @@ -113,8 +123,15 @@ // add it to indexes so we can find it the next time indexes.put(name, page); + if (LOG.isDebugEnabled()) { + LOG.debug("Created new repository index with name " + name + " at location " + page); + } + return created; } else { + if (LOG.isTraceEnabled()) { + LOG.trace("Repository index with name " + name + " at location " + location); + } return indexFactory.open(tx, location); } } Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java?rev=915431&r1=915430&r2=915431&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java Tue Feb 23 17:00:02 2010 @@ -23,9 +23,6 @@ import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; -/** - * Tests the HawtDBAggregationRepository implementation. - */ public class HawtDBAggregationRepositoryTest extends CamelTestSupport { private HawtDBFile hawtDBFile;