JDBC Persistent Aggregator
--------------------------

                 Key: CAMEL-3502
                 URL: https://issues.apache.org/jira/browse/CAMEL-3502
             Project: Camel
          Issue Type: New Feature
    Affects Versions: 2.6.0
         Environment: JDBC Aggregator Database
            Reporter: Olivier Roger


The patch provided is an implementation of a Aggregator for JDBC.

It implements *org.apache.camel.spi.AggregationRepository* (2.4+) and has been 
tested on Camel 2.4 and 2.6.
Some details have been disscussed on the [mailling 
list|http://camel.465427.n5.nabble.com/Aggregator-Persistence-td2800301.html] 
with Claus.

The implementation is very similar to the one for HawtDB. 
Unit tests have been included and use the H2 in-memory database.

About that, the Test *JdbcAggregateLoadAndRecoverTest* has been adapted to 
exclude the redeliveries from the failling messages. It is possible for a 
message to be sent due to a _scan()_ background operation evn before it is send 
by the _onCompletion()_ method ? 

Here is a small documentation

{quote}
h2. Database

To be operational, each aggregator uses two table: the aggregation and 
completed one. By convention the completed has the same name as the aggregation 
one suffixed with "_COMPLETED". The name must be configured in the Spring bean 
with the _RepositoryName_ property. In the following example aggregation will 
be used.

The table structure definition of both table are identical: in both case a 
String value is used as key (*id*) whereas a Blob contains the exchange 
serialized in byte array.
However one difference should be remembered: the *id* field does not have the 
same content depending on the table.
In the aggregation table *id* holds the correlation Id used by the component to 
aggregate the messages. In the completed table, *id* holds the id of the 
exchange stored in corresponding the blob field.

Here is the SQL query used to create the tables, just replace "aggregation" 
with your aggregator repository name.
{code}
CREATE TABLE aggregation (
    id varchar(255) NOT NULL,
    exchange blob NOT NULL,
    constraint aggregation_pk PRIMARY KEY (id)
);
CREATE TABLE aggregation_completed (
    id varchar(255) NOT NULL,
    exchange blob NOT NULL,
    constraint aggregation_completed_pk PRIMARY KEY (id)
);
{code}

h3. Codec (Serialization)

Since they can contain any type of payload, Exchanges are not serializable by 
design. It is converted into a byte array to be stored in a database BLOB field.
All those conversions are handled by the *JdbcCodec* class. One detail of the 
code requires your attention: the *ClassLoadingAwareObjectInputStream*.

The *ClassLoadingAwareObjectInputStream* has been reused from the ActiveMQ 
project. It wraps an *ObjectInputStream* and use it with the ContextClassLoader 
rather than the currentThread one. The benefit is to be able to load classes 
exposed by other bundles.
This allows the exchange body and headers to have custom types object 
references.

h2. Transactional 

TransactionTemplate is use to wrap all the calls to the database.
Therefore a Transaction Manager is required (see bean-declaration)

h2. Service (Start/Stop)

The *JdbcAggregationRepository* extends ServiceSupport. This includes the 
components in Camel component life cycle. 
The _start()_ method verify the connection of the database and the presence of 
the required tables.

h2. Aggregator configuration

Depending on the targeted environment, the aggregator might need some 
configuration. As you already know, each aggregator should have its own 
repository (with the corresponding pair of table created in the database) and a 
data source. If the default lobHandler is not adapted to your database system, 
it can be injected with the _lobHandler_ property.

Here is the declaration for Oracle :

{code}
    <bean id="lobHandler" 
class="org.springframework.jdbc.support.lob.OracleLobHandler">
        <property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/>
    </bean>

    <bean id="nativeJdbcExtractor" 
class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/>

    <bean id="repo" 
class="org.apache.camel.component.jdbc.aggregationRepository.JdbcAggregationRepository">
        <constructor-arg name="transactionManager" ref="transactionManager"/>
        <constructor-arg name="repositoryName" value="aggregation"/>
        <constructor-arg name="dataSource" ref="dataSource"/>
        <!-- Only with Oracle, else use default -->
        <property name="lobHandler" ref="lobHandler"/>
    </bean>
{code}

h2. Feature

A feature has been created : *camel-jdbc-aggregator*.

{quote}

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to