I found a solution really simple... I edited directly the camel-hazelcast
component ^^  Indeed it's a mix between the components seda & hazelcast and
the transactions are never started.

I don't know if it's the best solution but it works :)

Prerequisite :

The transactions are configured with camel

First step, add the accessor into HazelcastSedaEndpoint.java to get the
hazelcast instance (See below) :

public class HazelcastSedaEndpoint extends HazelcastDefaultEndpoint {

// Attributs
    private HazelcastInstance hazelcastInstance;

    public HazelcastSedaEndpoint(final HazelcastInstance hazelcastInstance,
final String uri, final HazelcastComponent component, final
HazelcastSedaConfiguration configuration) {
...
        this.hazelcastInstance = hazelcastInstance;
...
    }
        
    public HazelcastInstance getHazelcastInstance() {
        return hazelcastInstance;
    }

}

Second step, change the method "run" of HazelcastSedaConsumer.java to open a
transaction before poll the message in seda queue :

public void run() {
        final BlockingQueue<?> queue = endpoint.getQueue();

        while (queue != null && isRunAllowed()) {
            final Exchange exchange = this.getEndpoint().createExchange();

            final Transaction transaction =
endpoint.getHazelcastInstance().getTransaction();
            if (transaction != null && transaction.getStatus() ==
Transaction.TXN_STATUS_NO_TXN) {
                transaction.begin();
            }
            try {
                final Object body =
queue.poll(endpoint.getConfiguration().getPollInterval(),
TimeUnit.MILLISECONDS);
                if (body != null) {
                    if (body instanceof DefaultExchangeHolder) {
                        DefaultExchangeHolder.unmarshal(exchange,
(DefaultExchangeHolder) body);
                    } else {
                        exchange.getIn().setBody(body);
                    }
                    try {
                        // process using the asynchronous routing engine
                        processor.process(exchange, new AsyncCallback() {
                            public void done(boolean asyncDone) {
                                // noop
                            }
                        });
                        if (exchange.getException() != null) {
                            if (transaction != null) {
                                transaction.rollback();
                            }
                            getExceptionHandler().handleException("Error
processing exchange", exchange, exchange.getException());
                        } 
                    } catch (Exception e) {
                        if (transaction != null) {
                            transaction.rollback();
                        }
                        LOG.error("Hzlq Exception caught: " + e, e);
                    }
                }
                // It's OK
                if (exchange.getException() == null && transaction != null
&& transaction.getStatus() == Transaction.TXN_STATUS_ACTIVE) {
                    transaction.commit();
                }
            } catch (InterruptedException e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Hzlq Consumer Interrupted: " + e, e);
                }
                continue;
            } catch (Throwable e) {
                if (transaction != null) {
                    transaction.rollback();
                }
                getExceptionHandler().handleException("Error processing
exchange", exchange, e);
            }
        }


Thanks to this change, my files in mapstore are deleted only when my
transaction is commited.




--
View this message in context: 
http://camel.465427.n5.nabble.com/camel-hazelcast-Transactions-tp5726943p5727121.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to