Hi,
we have an application that is basically a "messaging bridge".
We come from one endpoint to another (or more).
Endpoints can either be IBM MqSeries, RabbitMq, Kafka, GCP Pub/Sub, some
database, and so.
The overall route is composed of 3 main parts :
- the inbound one that is basically a channel adapter : a standard component
that we customize for our purpose (but not that much)
- the "chore" one that does handle all the transformations (aggregator,
splitter, headerMapper, etc.)
- the outboud that is another channel adapter.
When we designed out components, we tried to make them agnostic : they are
neither in or outbound. They thus can be used in both cases.
(though this choix may be arguable, I think it is important to state it since
the example code below is used for both consumption and production).
The routes are going well... but for one case : when we consume from or publish
to MqSeries, we end up having terrible performances.
On a previous application, (implemented with another EIP framework), we had a
mean message rate of 40 m/s with only one consumer or producer.
Currently, our message rate dropped to 2 m/s with the JMS component.
(similar comparing conditions).
We could increase that amount but since the order of the messages is sometimes
vital, we cannot afford it as a permanent solution.
Another vital condition is the absolute necessity to not loose any message :
thus, we need the route to be transactional.
Thus, we need to increase that rate for a single consumer / producer without
dropping the transactional part.
We noticed that the component kept opening connections (most under TIME_WAIT
status, one - keeps changing - in ESTABLISED status).
Thus, we guessed that it may mean that connections are not cached and that the
recurring opening of connections may cause the terrible message rate.
We then tried to set up component cacheLevel to CACHE_CONSUMER and
CACHE_CONNECTION.... but it did not change the message rate (nor the many
connections)
We tried to set up a caching connection factory, but received a
"JmsConsumer[QUEUE.IN]] WARN o.a.c.c.j.DefaultJmsMessageListenerContainer -
Setup of JMS message listener invoker failed for destination 'QUEUE.IN' -
trying to recover. Cause: class jdk.proxy2.$Proxy75 cannot be cast to class
com.ibm.somejakarta.jms.MQSession (jdk.proxy2.$Proxy75 is in module jdk.proxy2
of loader 'app'; com.ibm.somejakarta.jms.MQSession is in unnamed module of
loader 'app')
We then tried with JmsPoolConnectionFactory, but (nearly) same error :
JmsConsumer[QUEUE.IN]] WARN o.a.c.c.j.DefaultJmsMessageListenerContainer -
Setup of JMS message listener invoker failed for destination 'QUEUE.IN' -
trying to recover. Cause: class org.messaginghub.pooled.jms.JmsPoolSession
cannot be cast to class com.ibm.somejakarta.jms.MQSession
(org.messaginghub.pooled.jms.JmsPoolSession and
com.ibm.somejakarta.jms.MQSession are in unnamed module of loader 'app')
We also tried to remove the transaction... and switched to CLIENT_ACK, but
though it improved a little bit (from 2 to 4 m/s), we are far from what we
expect (and since we need transactional, it sounds like not a suited solution).
So.... at this point, we are stuck with our transactional non-performant
consumer/producer.
Any idea on how to set it up in a way we those 3 constraints : performance,
transactionality, order?
Here are some pieces of our code :
public Component createMqSeriesComponent() {
JmsComponent component = new JmsComponent();
component.setConnectionFactory(createMqConnectionFactory(mq)); // or
one of the 2 other Factory creation methods
component.setDestinationResolver(new
WMQDestinationResolver(someTargetClient())); // for targetClient
component.setTransacted(true);
component.setTestConnectionOnStartup(true);
component.setReceiveTimeout(-1);
component.setExceptionListener(jmsExceptionListener);
component.setUsername(someUser); // not for Caching and Pool
component.setPassword(somePwd);// not for Caching and Pool
component.setAllowAdditionalHeaders("JMS_IBM_MQMD_.*");
component.setAcknowledgementModeName("SESSION_TRANSACTED");
component.setCacheLevelName("CACHE_SESSION");
return component;
}
private MQConnectionFactory createMqConnectionFactory(Mq mq) {
MQConnectionFactory connectionFactory = new MQConnectionFactory();
try {
connectionFactory.setHostName(someHosts());
connectionFactory.setPort(somePort());
connectionFactory.setQueueManager(someQm());
connectionFactory.setChannel(someChannel());
connectionFactory.setTransportType(1);
connectionFactory.setCCSID(someCcsid());
connectionFactory.setClientReconnectOptions(67108864);
} catch (JMSException e) {
throw new RuntimeException(e);
}
return connectionFactory;
}
private JmsPoolConnectionFactory createMqPoolingConnectionFactory(Mq mq) {
MQConnectionFactory connectionFactory = new MQConnectionFactory();
JmsPoolConnectionFactoryProperties properties = new
JmsPoolConnectionFactoryProperties();
properties.setIdleTimeout(Duration.ofSeconds(1));
properties.setMaxConnections(1);
JmsPoolConnectionFactoryFactory pooledConnectionFactoryFactory= new
JmsPoolConnectionFactoryFactory(properties);
UserCredentialsConnectionFactoryAdapter
userCredentialsConnectionFactoryAdapter = new
UserCredentialsConnectionFactoryAdapter();
JmsPoolConnectionFactory pooledConnectionFactory = null;
try {
connectionFactory.setHostName(someHosts);
connectionFactory.setPort(somePort);
connectionFactory.setQueueManager(someQm);
connectionFactory.setChannel(someChannel);
connectionFactory.setTransportType(1);
connectionFactory.setCCSID(someCcsid);
connectionFactory.setClientReconnectOptions(67108864);
userCredentialsConnectionFactoryAdapter.setTargetConnectionFactory(connectionFactory);
userCredentialsConnectionFactoryAdapter.setUsername(someUser());
userCredentialsConnectionFactoryAdapter.setPassword(vaultConfig.getPassword(mq));
pooledConnectionFactory =
pooledConnectionFactoryFactory.createPooledConnectionFactory(userCredentialsConnectionFactoryAdapter);
} catch (JMSException e) {
throw new RuntimeException(e);
}
return pooledConnectionFactory;
}
private CachingConnectionFactory createMqCachingConnectionFactory(Mq mq) {
MQConnectionFactory connectionFactory = new MQConnectionFactory();
UserCredentialsConnectionFactoryAdapter
userCredentialsConnectionFactoryAdapter = new
UserCredentialsConnectionFactoryAdapter();
CachingConnectionFactory cachingConnectionFactory = null;
try {
connectionFactory.setHostName(someHosts);
connectionFactory.setPort(somePort);
connectionFactory.setQueueManager(someQm);
connectionFactory.setChannel(someChannel);
connectionFactory.setTransportType(1);
connectionFactory.setCCSID(someCcsid);
connectionFactory.setClientReconnectOptions(67108864);
userCredentialsConnectionFactoryAdapter.setTargetConnectionFactory(connectionFactory);
userCredentialsConnectionFactoryAdapter.setUsername(someUser);
userCredentialsConnectionFactoryAdapter.setPassword(somePwd);
cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setTargetConnectionFactory(userCredentialsConnectionFactoryAdapter);
cachingConnectionFactory.setSessionCacheSize(50);
} catch (JMSException e) {
throw new RuntimeException(e);
}
return cachingConnectionFactory;
}
Thanks in advance.
Gaël LE BELLEGO
ATTENTION : Ce message et toutes les pièces jointes (ci-après le "message")
sont confidentiels et strictement réservés aux destinataires qui procèderont
aux vérifications appropriées en matière de virus. Toute utilisation ou
diffusion non autorisée est interdite.
Tout message électronique est susceptible d'altération. L'auteur de ce message
et le Groupe Open déclinent toute responsabilité au titre de ce message s'il a
été altéré, déformé, falsifié ou indûment utilisé par des tiers, ou encore s'il
a causé tout dommage ou perte de toute nature.
Si vous n'êtes pas destinataire de ce message, merci de le détruire
immédiatement et d'avertir l'expéditeur.
WARNING : This message and any attachments (the "message") are confidential and
strictly intended for their addressees, who will conduct appropriate virus
checks. Any unauthorised use or dissemination is prohibited.
Messages are susceptible to alteration. The author of this message or Open
Group shall not be liable for the message if altered, changed, falsified or
unduly used by third parties, or for any damage or loss.
If you are not receiver of this message, please cancel it immediately and
inform the sender.
________________________________
Open, responsable de traitement, met en œuvre des traitements de données à
caractère personnel.
Pour en savoir plus :
https://www.open.global/politique-de-protection-donnees-personnelles
Open, data controller, processes personal data.
Click the link below to find out more details:
https://www.open.global/politique-de-protection-donnees-personnelles