I am using Camel 2.6.0 with Active MQ 5.4.2, and am having a problem with the 
Aggregator using the HawtDB persistence (1.5 for HawtDB and 1.3 for HawtBuf).

What I am trying to do is to consume files from an FTP source and then 
aggregate these files into a single exchange.  The FTP site contains 3 
different files with the same base file name, e.g., FILENAME. abc, 
FILENAME.xyz, and FILENAME.done.  I am using a custom file strategy object to 
verify that the FILENAME.done is there before consuming either of the other two 
files (because the doneFile support did not exist when I first wrote this 
code), and this all works fine.   The filename is used as the aggregator 
correlation key, e.g., A123456 in the output below.

If I run my test before implementing the aggregator persistence using the 
HawtDB database, everything works fine.  The FILENAME.abc is consumed, then the 
FILENAME.xyz is consumed and these are then sent to my aggregator class 
process() method containing a grouped exchange.  In this method, I write the 
files to local temp files (using an input stream and File objects) and then 
read these files using a third party program that parses these files and 
presents the results to me in record format.

However, if I make a single change to my activemq.xml file to add the reference 
to my aggregator persistence, it continues to consume both files, but never 
aggregates them and sends them to my process() method.

  <aggregate groupExchanges="true" completionSize="2" 
aggregationRepositoryRef="aggregatorRepository">
    <correlationExpression>
      <header>basename</header>
    </correlationExpression>
    <process ref="myFileParser"/>
  </aggregate>

  <bean id="aggregatorRepository" 
class="org.apache.camel.component.hawtdb.HawtDBAggregationRepository">
    <property name="repositoryName" value="myAggregator"/>
    <property name="persistentFileName" value="data/myAggregator.dat"/>
  </bean>

My reason for wanting to use this persistence framework is to provide some 
better error handling in the event that by server crashes before completely 
processing the aggregated result since there seems not to be transactional 
support with the aggregator.  The commit() method on my custom file strategy 
class gets called as each file is consumed, not after the aggregated results 
are processed.

Is there any reason this should not work-am I misunderstanding what this 
aggregator persistence should provide?

When it consumes the first file, the log output from HawtDB is as follows:

2011-06-07 14:38:25,834 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile 
| Executing work +++ done  +++ Scan
2011-06-07 14:38:25,834 | TRACE | 
org.apache.camel.component.hawtdb.HawtDBAggregationRepository | Scanned and 
found no exchange to recover.
2011-06-07 14:38:25,845 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile 
| Executing work +++ start +++ Getting key [A123456]
2011-06-07 14:38:25,845 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile 
| Repository index with name myAggregator -> null
2011-06-07 14:38:25,846 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile 
| TX is read only: true for executed work: Getting key [A123456]
2011-06-07 14:38:25,848 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile 
| Executing work +++ done  +++ Getting key [A123456]
2011-06-07 14:38:25,848 | DEBUG | 
org.apache.camel.component.hawtdb.HawtDBAggregationRepository | Getting key  
[A123456] -> null
2011-06-07 14:38:25,849 | DEBUG | 
org.apache.camel.component.hawtdb.HawtDBAggregationRepository | Adding key   
[A123456] -> Exchange[null]
2011-06-07 14:38:25,852 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile 
| Executing work +++ start +++ Adding key [A123456]
2011-06-07 14:38:25,853 | DEBUG | org.apache.camel.component.hawtdb.HawtDBFile 
| Created new repository index with name myAggregator at location 1
2011-06-07 14:38:25,853 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile 
| Repository index with name myAggregator -> { page: 1, deferredEncoding: true }
2011-06-07 14:38:25,854 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile 
| TX is read only: false for executed work: Adding key [A123456]
2011-06-07 14:38:25,858 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile 
| Executing work +++ done  +++ Adding key [A123456]

When it consumes the second file, the output is as follows:

2011-06-07 14:38:25,886 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile 
| Executing work +++ start +++ Getting key [A123456]
2011-06-07 14:38:25,887 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile 
| Repository index with name myAggregator at location 1
2011-06-07 14:38:25,887 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile 
| Repository index with name myAggregator -> { page: 1, deferredEncoding: true }
2011-06-07 14:38:25,888 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile 
| TX is read only: true for executed work: Getting key [A123456]
2011-06-07 14:38:25,890 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile 
| Executing work +++ done  +++ Getting key [A123456]
2011-06-07 14:38:25,905 | DEBUG | 
org.apache.camel.component.hawtdb.HawtDBAggregationRepository | Getting key  
[A123456] -> Exchange[Message: [Body is null]]

At this point, it seems to be stuck.  The commit() method of my file strategy 
never gets called for the second file, nor is the grouped Exchange sent to my 
process() method.

Any suggestions?

Thanks,
Craig



Reply via email to