JDBC Persistent Aggregator -------------------------- Key: CAMEL-3502 URL: https://issues.apache.org/jira/browse/CAMEL-3502 Project: Camel Issue Type: New Feature Affects Versions: 2.6.0 Environment: JDBC Aggregator Database Reporter: Olivier Roger
The patch provided is an implementation of a Aggregator for JDBC. It implements *org.apache.camel.spi.AggregationRepository* (2.4+) and has been tested on Camel 2.4 and 2.6. Some details have been disscussed on the [mailling list|http://camel.465427.n5.nabble.com/Aggregator-Persistence-td2800301.html] with Claus. The implementation is very similar to the one for HawtDB. Unit tests have been included and use the H2 in-memory database. About that, the Test *JdbcAggregateLoadAndRecoverTest* has been adapted to exclude the redeliveries from the failling messages. It is possible for a message to be sent due to a _scan()_ background operation evn before it is send by the _onCompletion()_ method ? Here is a small documentation {quote} h2. Database To be operational, each aggregator uses two table: the aggregation and completed one. By convention the completed has the same name as the aggregation one suffixed with "_COMPLETED". The name must be configured in the Spring bean with the _RepositoryName_ property. In the following example aggregation will be used. The table structure definition of both table are identical: in both case a String value is used as key (*id*) whereas a Blob contains the exchange serialized in byte array. However one difference should be remembered: the *id* field does not have the same content depending on the table. In the aggregation table *id* holds the correlation Id used by the component to aggregate the messages. In the completed table, *id* holds the id of the exchange stored in corresponding the blob field. Here is the SQL query used to create the tables, just replace "aggregation" with your aggregator repository name. {code} CREATE TABLE aggregation ( id varchar(255) NOT NULL, exchange blob NOT NULL, constraint aggregation_pk PRIMARY KEY (id) ); CREATE TABLE aggregation_completed ( id varchar(255) NOT NULL, exchange blob NOT NULL, constraint aggregation_completed_pk PRIMARY KEY (id) ); {code} h3. Codec (Serialization) Since they can contain any type of payload, Exchanges are not serializable by design. It is converted into a byte array to be stored in a database BLOB field. All those conversions are handled by the *JdbcCodec* class. One detail of the code requires your attention: the *ClassLoadingAwareObjectInputStream*. The *ClassLoadingAwareObjectInputStream* has been reused from the ActiveMQ project. It wraps an *ObjectInputStream* and use it with the ContextClassLoader rather than the currentThread one. The benefit is to be able to load classes exposed by other bundles. This allows the exchange body and headers to have custom types object references. h2. Transactional TransactionTemplate is use to wrap all the calls to the database. Therefore a Transaction Manager is required (see bean-declaration) h2. Service (Start/Stop) The *JdbcAggregationRepository* extends ServiceSupport. This includes the components in Camel component life cycle. The _start()_ method verify the connection of the database and the presence of the required tables. h2. Aggregator configuration Depending on the targeted environment, the aggregator might need some configuration. As you already know, each aggregator should have its own repository (with the corresponding pair of table created in the database) and a data source. If the default lobHandler is not adapted to your database system, it can be injected with the _lobHandler_ property. Here is the declaration for Oracle : {code} <bean id="lobHandler" class="org.springframework.jdbc.support.lob.OracleLobHandler"> <property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/> </bean> <bean id="nativeJdbcExtractor" class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/> <bean id="repo" class="org.apache.camel.component.jdbc.aggregationRepository.JdbcAggregationRepository"> <constructor-arg name="transactionManager" ref="transactionManager"/> <constructor-arg name="repositoryName" value="aggregation"/> <constructor-arg name="dataSource" ref="dataSource"/> <!-- Only with Oracle, else use default --> <property name="lobHandler" ref="lobHandler"/> </bean> {code} h2. Feature A feature has been created : *camel-jdbc-aggregator*. {quote} -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.