Hi,

we have an application that is basically a "messaging bridge".
We come from one endpoint to another (or more).
Endpoints can either be IBM MqSeries, RabbitMq, Kafka, GCP Pub/Sub, some 
database, and so.

The overall route is composed of 3 main parts :
- the inbound one that is basically a channel adapter : a standard component 
that we customize for our purpose (but not that much)
- the "chore" one that does handle all the transformations (aggregator, 
splitter, headerMapper, etc.)
- the outboud that is another channel adapter.

When we designed out components, we tried to make them agnostic : they are 
neither in or outbound. They thus can be used in both cases.
(though this choix may be arguable, I think it is important to state it since 
the example code below is used for both consumption and production).

The routes are going well... but for one case : when we consume from or publish 
to MqSeries, we end up having terrible performances.
On a previous application, (implemented with another EIP framework), we had a 
mean message rate of 40 m/s with only one consumer or producer.
Currently, our message rate dropped to 2 m/s with the JMS component.
(similar comparing conditions).


We could increase that amount but since the order of the messages is sometimes 
vital, we cannot afford it as a permanent solution.
Another vital condition is the absolute necessity to not loose any message : 
thus, we need the route to be transactional.

Thus, we need to increase that rate for a single consumer / producer without 
dropping the transactional part.

We noticed that the component kept opening connections (most under TIME_WAIT 
status, one - keeps changing - in ESTABLISED status).
Thus, we guessed that it may mean that connections are not cached and that the 
recurring opening of connections may cause the terrible message rate.

We then tried to set up component cacheLevel to CACHE_CONSUMER and 
CACHE_CONNECTION.... but it did not change the message rate (nor the many 
connections)

We tried to set up a caching connection factory, but received a 
"JmsConsumer[QUEUE.IN]] WARN  o.a.c.c.j.DefaultJmsMessageListenerContainer - 
Setup of JMS message listener invoker failed for destination 'QUEUE.IN' - 
trying to recover. Cause: class jdk.proxy2.$Proxy75 cannot be cast to class 
com.ibm.somejakarta.jms.MQSession (jdk.proxy2.$Proxy75 is in module jdk.proxy2 
of loader 'app'; com.ibm.somejakarta.jms.MQSession is in unnamed module of 
loader 'app')

We then tried with JmsPoolConnectionFactory, but (nearly) same error : 
JmsConsumer[QUEUE.IN]] WARN  o.a.c.c.j.DefaultJmsMessageListenerContainer - 
Setup of JMS message listener invoker failed for destination 'QUEUE.IN' - 
trying to recover. Cause: class org.messaginghub.pooled.jms.JmsPoolSession 
cannot be cast to class com.ibm.somejakarta.jms.MQSession 
(org.messaginghub.pooled.jms.JmsPoolSession and 
com.ibm.somejakarta.jms.MQSession are in unnamed module of loader 'app')

We also tried to remove the transaction... and switched to CLIENT_ACK, but 
though it improved a little bit (from 2 to 4 m/s), we are far from what we 
expect (and since we need transactional, it sounds like not a suited solution).


So.... at this point, we are stuck with our transactional non-performant 
consumer/producer.
Any idea on how to set it up in a way we those 3 constraints : performance, 
transactionality, order?


Here are some pieces of our code :

    public Component createMqSeriesComponent() {

        JmsComponent component = new JmsComponent();
        component.setConnectionFactory(createMqConnectionFactory(mq)); // or 
one of the 2 other Factory creation methods
        component.setDestinationResolver(new 
WMQDestinationResolver(someTargetClient())); // for targetClient
        component.setTransacted(true);
        component.setTestConnectionOnStartup(true);
        component.setReceiveTimeout(-1);
        component.setExceptionListener(jmsExceptionListener);
        component.setUsername(someUser); // not for Caching and Pool
        component.setPassword(somePwd);// not for Caching and Pool
        component.setAllowAdditionalHeaders("JMS_IBM_MQMD_.*");
        component.setAcknowledgementModeName("SESSION_TRANSACTED");
        component.setCacheLevelName("CACHE_SESSION");
        return component;
    }


    private MQConnectionFactory createMqConnectionFactory(Mq mq) {

        MQConnectionFactory connectionFactory = new MQConnectionFactory();
        try {
            connectionFactory.setHostName(someHosts());
            connectionFactory.setPort(somePort());
            connectionFactory.setQueueManager(someQm());
            connectionFactory.setChannel(someChannel());
            connectionFactory.setTransportType(1);
            connectionFactory.setCCSID(someCcsid());
            connectionFactory.setClientReconnectOptions(67108864);

        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
        return connectionFactory;
    }

    private JmsPoolConnectionFactory createMqPoolingConnectionFactory(Mq mq) {

        MQConnectionFactory connectionFactory = new MQConnectionFactory();
        JmsPoolConnectionFactoryProperties properties = new 
JmsPoolConnectionFactoryProperties();
        properties.setIdleTimeout(Duration.ofSeconds(1));
        properties.setMaxConnections(1);
        JmsPoolConnectionFactoryFactory pooledConnectionFactoryFactory= new 
JmsPoolConnectionFactoryFactory(properties);
        UserCredentialsConnectionFactoryAdapter 
userCredentialsConnectionFactoryAdapter = new 
UserCredentialsConnectionFactoryAdapter();
        JmsPoolConnectionFactory pooledConnectionFactory = null;

        try {
            connectionFactory.setHostName(someHosts);
            connectionFactory.setPort(somePort);
            connectionFactory.setQueueManager(someQm);
            connectionFactory.setChannel(someChannel);
            connectionFactory.setTransportType(1);
            connectionFactory.setCCSID(someCcsid);
            connectionFactory.setClientReconnectOptions(67108864);

            
userCredentialsConnectionFactoryAdapter.setTargetConnectionFactory(connectionFactory);
            userCredentialsConnectionFactoryAdapter.setUsername(someUser());
            
userCredentialsConnectionFactoryAdapter.setPassword(vaultConfig.getPassword(mq));

            pooledConnectionFactory = 
pooledConnectionFactoryFactory.createPooledConnectionFactory(userCredentialsConnectionFactoryAdapter);

        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
        return pooledConnectionFactory;
    }

    private CachingConnectionFactory createMqCachingConnectionFactory(Mq mq) {

        MQConnectionFactory connectionFactory = new MQConnectionFactory();
        UserCredentialsConnectionFactoryAdapter 
userCredentialsConnectionFactoryAdapter = new 
UserCredentialsConnectionFactoryAdapter();
        CachingConnectionFactory cachingConnectionFactory = null;

        try {
            connectionFactory.setHostName(someHosts);
            connectionFactory.setPort(somePort);
            connectionFactory.setQueueManager(someQm);
            connectionFactory.setChannel(someChannel);
            connectionFactory.setTransportType(1);
            connectionFactory.setCCSID(someCcsid);
            connectionFactory.setClientReconnectOptions(67108864);

            
userCredentialsConnectionFactoryAdapter.setTargetConnectionFactory(connectionFactory);
            userCredentialsConnectionFactoryAdapter.setUsername(someUser);
            userCredentialsConnectionFactoryAdapter.setPassword(somePwd);

            cachingConnectionFactory = new CachingConnectionFactory();
            
cachingConnectionFactory.setTargetConnectionFactory(userCredentialsConnectionFactoryAdapter);
            cachingConnectionFactory.setSessionCacheSize(50);

        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
        return cachingConnectionFactory;
    }



Thanks in advance.


Gaël LE BELLEGO



ATTENTION : Ce message et toutes les pièces jointes (ci-après le "message") 
sont confidentiels et strictement réservés aux destinataires qui procèderont 
aux vérifications appropriées en matière de virus. Toute utilisation ou 
diffusion non autorisée est interdite.
Tout message électronique est susceptible d'altération. L'auteur de ce message 
et le Groupe Open déclinent toute responsabilité au titre de ce message s'il a 
été altéré, déformé, falsifié ou indûment utilisé par des tiers, ou encore s'il 
a causé tout dommage ou perte de toute nature.
Si vous n'êtes pas destinataire de ce message, merci de le détruire 
immédiatement et d'avertir l'expéditeur.

WARNING : This message and any attachments (the "message") are confidential and 
strictly intended for their addressees, who will conduct appropriate virus 
checks. Any unauthorised use or dissemination is prohibited.
Messages are susceptible to alteration. The author of this message or Open 
Group shall not be liable for the message if altered, changed, falsified or 
unduly used by third parties, or for any damage or loss.
If you are not receiver of this message, please cancel it immediately and 
inform the sender.
________________________________

Open, responsable de traitement, met en œuvre des traitements de données à 
caractère personnel.
Pour en savoir plus : 
https://www.open.global/politique-de-protection-donnees-personnelles

Open, data controller, processes personal data.
Click the link below to find out more details: 
https://www.open.global/politique-de-protection-donnees-personnelles

Reply via email to