Hello and thank you for your reply.

This is the code for aggregating the whole camel message:
    class ObjectsAggregationStrategy implements AggregationStrategy {

        final Logger logger =
Logger.getLogger(ObjectsAggregationStrategy.class);

        @SuppressWarnings("unchecked")
        @Override
        public Exchange aggregate(Exchange oldExchange, Exchange
newExchange) {
            List<Message> items;
            if (oldExchange == null) {
                items = new ArrayList<Message>();
                items.add(newExchange.getIn());
                Exchange copyExchange = newExchange.copy();
                copyExchange.getIn().setBody(items);
                return copyExchange;
            }

            items = oldExchange.getIn().getBody(List.class);
            Message newItem = newExchange.getIn();
            items.add(newItem);
            oldExchange.getIn().setBody(items);

            Exchange finalExchange = updateInMessageHeaders(oldExchange,
newExchange);
            finalExchange = updateExchangeProperties(oldExchange,
newExchange);

            return finalExchange;
        }

        private Exchange updateExchangeProperties(Exchange oldExchange,
Exchange newExchange) {
            CaseInsensitiveMap properties = new
CaseInsensitiveMap(newExchange.getProperties());

            for (Entry<String, Object> entry : properties.entrySet()) {
                oldExchange.setProperty(entry.getKey(), entry.getValue());
            }

            return oldExchange;
        }

        private Exchange updateInMessageHeaders(Exchange oldExchange,
Exchange newExchange) {
            CaseInsensitiveMap headers = new
CaseInsensitiveMap(newExchange.getIn().getHeaders());

            for (Entry<String, Object> entry : headers.entrySet()) {
                if
(entry.getKey().equals(MessageConstant.ERES_FILE_COMPLETE) &&
entry.getValue().equals(Boolean.TRUE)) {
                    logger.debug("Received and set the ERES_FILE_COMPLETE
header from routeId["
                            + oldExchange.getFromRouteId() + "].");
                }
                oldExchange.getIn().setHeader(entry.getKey(),
entry.getValue());
            }

            return oldExchange;
        }
    }


And this is the code for aggregating a serializable custom bean:
public class MessageAggregationStrategy<T> implements AggregationStrategy {
    final Logger logger =
Logger.getLogger(MessageAggregationStrategy.class);

    @SuppressWarnings("unchecked")
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        MessageBean<T> bean = setExchangeData(newExchange);
        List<MessageBean&lt;T>> items;
        if (oldExchange == null) {
            items = new ArrayList<MessageBean&lt;T>>();
            items.add(bean);
            Exchange copyExchange = newExchange.copy();
            copyExchange.getIn().setBody(items);
            return copyExchange;
        }

        items = oldExchange.getIn().getBody(List.class);
        items.add(bean);
        oldExchange.getIn().setBody(items);

        return oldExchange;
    }

    @SuppressWarnings("unchecked")
    private MessageBean<T> setExchangeData(Exchange exchange) {
        MessageBean<T> bean = new MessageBean<T>();
        bean.setBody((T) exchange.getIn().getBody());
        bean.setHeaders(new
CaseInsensitiveMap(exchange.getIn().getHeaders()));
        bean.setProperties(new
CaseInsensitiveMap(exchange.getProperties()));
        return bean;
    }


where MessageBean is:
public class MessageBean<T> implements Serializable {
    private static final long serialVersionUID = -987592973477513095L;
    private T body;
    private CaseInsensitiveMap properties;
    private CaseInsensitiveMap headers;

    public T getBody() {
        return body;
    }

    public CaseInsensitiveMap getProperties() {
        return properties;
    }

    public CaseInsensitiveMap getHeaders() {
        return headers;
    }

    public void setBody(T body) {
        this.body = body;
    }

    public void setProperties(CaseInsensitiveMap properties) {
        this.properties = properties;
    }

    public void setHeaders(CaseInsensitiveMap headers) {
        this.headers = headers;
    }
}




--
View this message in context: 
http://camel.465427.n5.nabble.com/Exceptions-when-aggregating-messages-tp5735523p5735560.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to