[ https://issues.apache.org/jira/browse/CAMEL-3502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Claus Ibsen updated CAMEL-3502: ------------------------------- Fix Version/s: 2.7.0 > JDBC Persistent Aggregator > -------------------------- > > Key: CAMEL-3502 > URL: https://issues.apache.org/jira/browse/CAMEL-3502 > Project: Camel > Issue Type: New Feature > Affects Versions: 2.6.0 > Environment: JDBC Aggregator Database > Reporter: Olivier Roger > Fix For: 2.7.0 > > Attachments: jdbc-aggregator.patch > > > The patch provided is an implementation of a Aggregator for JDBC. > It implements *org.apache.camel.spi.AggregationRepository* (2.4+) and has > been tested on Camel 2.4 and 2.6. > Some details have been disscussed on the [mailling > list|http://camel.465427.n5.nabble.com/Aggregator-Persistence-td2800301.html] > with Claus. > The implementation is very similar to the one for HawtDB. > Unit tests have been included and use the H2 in-memory database. > About that, the Test *JdbcAggregateLoadAndRecoverTest* has been adapted to > exclude the redeliveries from the failling messages. It is possible for a > message to be sent due to a _scan()_ background operation evn before it is > send by the _onCompletion()_ method ? > Here is a small documentation > {quote} > h2. Database > To be operational, each aggregator uses two table: the aggregation and > completed one. By convention the completed has the same name as the > aggregation one suffixed with "_COMPLETED". The name must be configured in > the Spring bean with the _RepositoryName_ property. In the following example > aggregation will be used. > The table structure definition of both table are identical: in both case a > String value is used as key (*id*) whereas a Blob contains the exchange > serialized in byte array. > However one difference should be remembered: the *id* field does not have the > same content depending on the table. > In the aggregation table *id* holds the correlation Id used by the component > to aggregate the messages. In the completed table, *id* holds the id of the > exchange stored in corresponding the blob field. > Here is the SQL query used to create the tables, just replace "aggregation" > with your aggregator repository name. > {code} > CREATE TABLE aggregation ( > id varchar(255) NOT NULL, > exchange blob NOT NULL, > constraint aggregation_pk PRIMARY KEY (id) > ); > CREATE TABLE aggregation_completed ( > id varchar(255) NOT NULL, > exchange blob NOT NULL, > constraint aggregation_completed_pk PRIMARY KEY (id) > ); > {code} > h3. Codec (Serialization) > Since they can contain any type of payload, Exchanges are not serializable by > design. It is converted into a byte array to be stored in a database BLOB > field. > All those conversions are handled by the *JdbcCodec* class. One detail of the > code requires your attention: the *ClassLoadingAwareObjectInputStream*. > The *ClassLoadingAwareObjectInputStream* has been reused from the ActiveMQ > project. It wraps an *ObjectInputStream* and use it with the > ContextClassLoader rather than the currentThread one. The benefit is to be > able to load classes exposed by other bundles. > This allows the exchange body and headers to have custom types object > references. > h2. Transactional > TransactionTemplate is use to wrap all the calls to the database. > Therefore a Transaction Manager is required (see bean-declaration) > h2. Service (Start/Stop) > The *JdbcAggregationRepository* extends ServiceSupport. This includes the > components in Camel component life cycle. > The _start()_ method verify the connection of the database and the presence > of the required tables. > h2. Aggregator configuration > Depending on the targeted environment, the aggregator might need some > configuration. As you already know, each aggregator should have its own > repository (with the corresponding pair of table created in the database) and > a data source. If the default lobHandler is not adapted to your database > system, it can be injected with the _lobHandler_ property. > Here is the declaration for Oracle : > {code} > <bean id="lobHandler" > class="org.springframework.jdbc.support.lob.OracleLobHandler"> > <property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/> > </bean> > <bean id="nativeJdbcExtractor" > class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/> > <bean id="repo" > class="org.apache.camel.component.jdbc.aggregationRepository.JdbcAggregationRepository"> > <constructor-arg name="transactionManager" ref="transactionManager"/> > <constructor-arg name="repositoryName" value="aggregation"/> > <constructor-arg name="dataSource" ref="dataSource"/> > <!-- Only with Oracle, else use default --> > <property name="lobHandler" ref="lobHandler"/> > </bean> > {code} > h2. Feature > A feature has been created : *camel-jdbc-aggregator*. > {quote} -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.