Hi Zoran,

thanks for your reply. I added my notes below.

> -----Ursprüngliche Nachricht-----
> Von: Zoran Regvart [mailto:zo...@regvart.com]
> Gesendet: Sonntag, 20. August 2017 00:52
> An: users@camel.apache.org
> Betreff: Re: Race Condition in Aggregation using
> HazelcastAggregationRepository in a cluster
> 
> Hi Michael,
> it's a bit hard to follow so I could be misunderstanding your issue;
> is your issue that there is a race condition between the aggregator
> that expects the reply on node A and another aggregator that is not
> aware of the initial request on node B?

That's pretty close. But node A ist not expecting the reply. It should not make 
a difference 
whether the reply is consumed on node A or node B. That is why we used the 
HazelcastAggregationRepository in the first place. 

The problem is, that AggregateProcessor uses a synchronization lock so that 
other threads have to wait to access the repository.
But if the threads run on another node and therefore in another JVM that lock 
is not sufficient.

> 
> If you're doing only request-reply correlation perhaps take a look at
> InOut message exchange pattern with a correlation property[1] with the
> replying application setting the ReplyToQMgr to the requester's queue
> manager.

We need the aggregator to save us a process instance id from an BPMN process 
that uses the request route to send the message.
When the response comes back we fetch this id from the aggregated message in 
order to send the response to the correct process instance.
We decided against InOut ExchangePattern because the response will not be 
processed if the node of the requestor fails. 
Instead of this we implemented the InOut by using the JMSMessageID and 
JMSCorrelationId as aggregation condition. Because of the 
hazelcastAggregationRepository both nodes could handle any incoming message.

> 
> Or, place the reply in a Hazelcast queue regardless of the queue
> manager the reply landed on and process the reply from there.

That's a good idea. I'll have a look at this. 

> 
> Also I think that it would be better to setup the reply coordination
> expectation (with timeouts and without transactions -- that would
> block) before sending the message.
> 

Sorry but I think I didn't understand this. The message is sent without any 
transaction and the timeout is implemented using the timeout mechanism of the 
aggregation. We used it like this because we don't need any more coordination 
between the nodes, only the aggregation repository. 

> 2c

By the way: I now use optimistic locking but had to fix it in the 
HazelcastAggregationRepository (see 
https://github.com/thuri/hazelcastrepository-optimistic-issue) to make it work. 
I'll send an PR in a few days.

Regards,
Michael

> 
> [1] https://camel.apache.org/correlation-identifier.html
> 
> On Wed, Aug 16, 2017 at 5:10 PM, Michael Lück <michael.lu...@hm-ag.de>
> wrote:
> > Hi there,
> >
> > we just had an issue in one of our systems and it looks like there is an
> > issue with locking in the AggregateProcessor in a
> > distributed environment.
> >
> > I’ll try to explain it:
> >
> > ===================================================
> > Scenario
> > ===================================================
> >
> > We use camel-core and camel-hazelcast 2.16.5  and hazelcast 3.5.2
> >
> > We have a route which sends a message to an Websphere MQ Queue (via
> > JMSComponent) and after that we put
> > the message into an aggregator which uses the JMSCorrelationId to
> correlate
> > the request and the response.
> >
> > from(epAggregation)
> >   .aggregate(header("JMSCorrelationID"), new
> CustomAggregationStrategy())
> >
> >
> .completionTimeout(Integer.parseInt(getContext().resolvePropertyPlacehol
> ders
> > ("{{timeout}}")))
> >   .completionSize(2)
> >   .aggregationRepository(aggrRepo)
> >
> > The aggregationRepository aggrRepo is created like this
> >   HazelcastAggregationRepository  aggrRepo = new
> > HazelcastAggregationRepository ("aggrRepoDrsBatch", hcInst));
> > where hcInst is an Instance of com.hazelcast.core.HazelcastInstance.
> >
> > We also have another route which reads the response from the response
> queue
> > and forwards it to the aggregator.
> >
> > The environment consists of two nodes on which the same code is running
> (so
> > essentially the send and response routes
> > and the aggregation)
> >
> > The problem arises when the response is returned really fast and is
> consumed
> > on the node that didn't sent the response.
> >
> > ========================================
> > Analysis
> > ========================================
> >
> > I digged a bit in the camel code and it seems to me that the problem here is
> > the lock in the AggregateProcessor as it is local
> > to the VM in which the code runs. I'll try for an example to make this more
> > clear:
> >
> > - Node A sends a MQ message and after that it puts the message into the
> > aggregator. The AggregateProcessor runs and
> >   checks the lock before going into doAggregation()
> >         - in doAggregation it tries to get the Exchange from the repository
> > and doesn't find any. So it continues to aggregate
> >                 the first message an writes this into the repository
> > - In about the same time between reading the exchange from the
> repository
> > and before writing the "aggregated" first
> >   message into the repository Node B fetches the reply from the response
> > queue and sends it to the aggregator. As in node A
> >   the lock is checked and as the code runs on another VM the lock is free
> > and the AggregateProcessor can go to doAggregation
> >         - in doAggregation the Node tries to get the Exchange from the
> > repository before the other node has written it.
> >           And like Node A the code proceeds with creating the first Exchange
> > for the aggregation and writes in into the
> >                 repository.
> >
> > The result is that one of the nodes will override the Exchange the other
> > created before. And the Aggreagtion will never
> > complete (actually it does but because of the timeout)
> >
> > =========================================
> > Ideas to solve the problem
> > =========================================
> > - probably optimistic locking is an option here as
> > HazelcastAggregationRepository supports this by implementing
> >   OptimisticLockingAggregationRepository
> > => I'd like to hear your thoughts on this.
> >
> > - currently we can stop the route consuming from the response route on
> one
> > Node to eliminate the error. But this is not
> >   an option for a long time because we lose the ability for fail over
> > - probably it's an idea to make the AggregateProcessor get the Lock Object
> > from the repository. So for example for the
> >   HazelcastAggregationRepository the repository can return the lock object
> > for the hazelcast map which would lock it for the
> >   whole cluster.
> > - I thought about resending the MQ message in case of an timeout but as
> the
> > request has side effects on the system that
> >   processes the message this is not really an option.
> >
> > So I hope I could make myself clear,
> > If you have any questions which would help you to help me, I'd happy to
> > answer them.
> >
> > Regards,
> > Michael
> >
> >
> >
> 
> 
> 
> --
> Zoran Regvart

Reply via email to