How to use RoutePolicy to prevent a route from starting

2023-08-28 Thread Han Yainsun
Hi Camel Community,

Greetings to you!

Camel version: 3.20.4
Spring Boot version: 2.7.11
JDK: Amazon Corretto 17
Platform: Windows server 2019
IDE: IntelliJ IDEA 2021.3.2 (Community Edition)
Stack Traces & logging output: attached at the end

I have a case that two routes use the same HTTP listening URL but different 
backend, one is mater and another is backup. Normally during the Camel start 
only master route should be started, the backup route should be in stop status. 
When the master route corresponding backend got issue we can manually start the 
backup route and stop the master route.

The problem is that during Camel startup it will try to start all routes and 
failed due to below error:

---
 Application run 
failed","logger_name":"org.springframework.boot.SpringApplication","thread_name":"main","level":"ERROR","level_value":4,"stack_trace":"org.apache.camel.FailedToStartRouteException:
 Failed to start route SMS_JSON_MsgFlow because of Multiple consumers for the 
same endpoint is not allowed: 
jetty:http://0.0.0.0:7806/esb00/helmjson?httpMethodRestrict=POST&matchOnUriPrefix=true
---

I read the Camel document and seems RoutePolicy[1] could be the solution, so I 
take class ThrottlingInflightRoutePolicy as reference and extend class 
RoutePolicySupport and override method onStart as below, however seems it not 
works. I can see it print the log "stop omni channel interface: " on console 
but the CamelContext still try to start the route and got issue.

Could you kindly advise on this? Thanks in advance!

---
@Override
public void onStart(Route route) {
System.out.println("starting route: " + route.getRouteId());
boolean startOmniChannel = 
Boolean.parseBoolean(String.valueOf(camelContext.getPropertiesComponent().resolveProperty("start_omni_channel")));
if (!startOmniChannel) {
try {
System.out.println("stop omni channel interface: " + 
route.getRouteId());
suspendOrStopConsumer(route.getConsumer());
} catch (Exception e) {
e.printStackTrace();
}
}
}
---

Stack Traces & logging output:
---
{"@timestamp":"2023-08-28T10:58:07.52+08:00","@version":"1","message":"Apache 
Camel 3.20.4 (mini-esb) is 
starting","logger_name":"org.apache.camel.impl.engine.AbstractCamelContext","thread_name":"main","level":"INFO","level_value":2}
{"@timestamp":"2023-08-28T10:58:07.62+08:00","@version":"1","message":"Starting 
CamelMainRunController to ensure the main thread keeps 
running","logger_name":"org.apache.camel.spring.boot.CamelSpringBootApplicationListener","thread_name":"main","level":"INFO","level_value":2}
{"@timestamp":"2023-08-28T10:58:07.622+08:00","@version":"1","message":"Apache 
Camel (Main) 3.20.4 is 
starting","logger_name":"org.apache.camel.main.MainSupport","thread_name":"CamelMainRunController","level":"INFO","level_value":2}
{"@timestamp":"2023-08-28T10:58:07.642+08:00","@version":"1","message":"Logging 
initialized @5011ms to 
org.eclipse.jetty.util.log.Slf4jLog","logger_name":"org.eclipse.jetty.util.log","thread_name":"main","level":"INFO","level_value":2}
{"@timestamp":"2023-08-28T10:58:08.1+08:00","@version":"1","message":"jetty-9.4.51.v20230217;
 built: 2023-02-17T08:19:37.309Z; git: 
b45c405e4544384de066f814ed42ae3dceacdd49; jvm 
17.0.7+7-LTS","logger_name":"org.eclipse.jetty.server.Server","thread_name":"main","level":"INFO","level_value":2}
{"@timestamp":"2023-08-28T10:58:08.53+08:00","@version":"1","message":"Started 
o.e.j.s.ServletContextHandler@4c635edc{/,null,AVAILABLE}","logger_name":"org.eclipse.jetty.server.handler.ContextHandler","thread_name":"main","level":"INFO","level_value":2}
{"@timestamp":"2023-08-28T10:58:08.994+08:00","@version":"1","message":"Started 
ServerConnector@3009eed7{HTTP/1.1, 
(http/1.1)}{0.0.0.0:7806}","logger_name":"org.eclipse.jetty.server.AbstractConnector","thread_name":"main","level":"INFO","level_value":2}
{"@timestamp":"2023-08-28T10:58:08.994+08:00","@version":"1","message":"Started 
@6363ms","logger_name":"org.eclipse.jetty.server.Server","thread_name":"main","level":"INFO","level_value":2}
{"@timestamp":"2023-08-28T10:58:09.011+08:00","@version":"1","message":"Error 
starting CamelContext (mini-esb) due to exception thrown: Failed to start route 
SMS_JSON_MsgFlow because of Multiple consumers for the same endpoint is not 
allowed: 
jetty:http://0.0.0.0:7806/esb00/helmjson?httpMethodRestrict=POST&matchOnUriPrefix=true","logger_name":"org.apache.camel.impl.engine.AbstractCamelContext","thread_name":"main","level":"ERROR","level_value":4,"stack_trace":"org.apache.camel.FailedToStartRouteException:
 Failed to start route SMS_JSON_MsgFlow because of Multiple consumers for the 
same endpoint is not allowed: 
jetty:http://0.0.0.0:7806/esb00/helmjson?httpMethodRestrict=POST&matchOnUriPrefix=true\r\n\tat
 
org.apache.camel.impl.engine.InternalRouteStartupManager.doStartOrResumeRouteConsumers(I

Global limit to threads count used by Camel (engine + routes pools + ..)

2023-08-28 Thread Modanese, Riccardo
Hello to everyone,
 I’m working to place a cap to the threads used by my Camel routes.

I have several routes (the number can vary time to time) and I created a thread 
pool profile like this one (Camel 3.x with SpringBoot 2.5.x)


My routes have a recipient list and an error handler (no multicast).
I set the same thread pool profile to each error handler and recipient list:
executorServiceRef="myRoutePoolProfile"

Looking at the thread instantiated by the JVM (since I noticed a huge quantity 
of memory allocated increasing which grows continuously for a while) I have 
seen recipient list allocating a new thread pool with the configuration I 
provided (myRoutePoolProfile) for each route instead of sharing it for all the 
routes.

My needing is to limit the threads used by all the routes and Camel engine 
itself by setting a global limit.
In this way I can be confident to have a predictable amount of memory used by 
the JVM (once setting heap, code caching and so on)

Do you have any idea or documentation to point me to?

Thank for your support.

Regards,
Riccardo Modanese


Re: Global limit to threads count used by Camel (engine + routes pools + ..)

2023-08-28 Thread Claus Ibsen
Hi

A thread pool profile is just a template for sharing the same settings, not
the same pool instance.
Instead use  to create a pool instance and share this.

See more in the docs
https://camel.apache.org/manual/threading-model.html

On Mon, Aug 28, 2023 at 9:53 AM Modanese, Riccardo
 wrote:

> Hello to everyone,
>  I’m working to place a cap to the threads used by my Camel routes.
>
> I have several routes (the number can vary time to time) and I created a
> thread pool profile like this one (Camel 3.x with SpringBoot 2.5.x)
>  id="myRoutePoolProfile"
> poolSize="…"
> maxPoolSize="…"
> maxQueueSize="…"
> rejectedPolicy="…"/>
>
> My routes have a recipient list and an error handler (no multicast).
> I set the same thread pool profile to each error handler and recipient
> list:
> executorServiceRef="myRoutePoolProfile"
>
> Looking at the thread instantiated by the JVM (since I noticed a huge
> quantity of memory allocated increasing which grows continuously for a
> while) I have seen recipient list allocating a new thread pool with the
> configuration I provided (myRoutePoolProfile) for each route instead of
> sharing it for all the routes.
>
> My needing is to limit the threads used by all the routes and Camel engine
> itself by setting a global limit.
> In this way I can be confident to have a predictable amount of memory used
> by the JVM (once setting heap, code caching and so on)
>
> Do you have any idea or documentation to point me to?
>
> Thank for your support.
>
> Regards,
> Riccardo Modanese
>


-- 
Claus Ibsen
-
@davsclaus
Camel in Action 2: https://www.manning.com/ibsen2


Re: Global limit to threads count used by Camel (engine + routes pools + ..)

2023-08-28 Thread Modanese, Riccardo
Thank you for your reply.

I’ll try asap

From: Claus Ibsen 
Date: Monday, 28 August 2023 at 10:26
To: users@camel.apache.org 
Subject: Re: Global limit to threads count used by Camel (engine + routes pools 
+ ..)
Hi

A thread pool profile is just a template for sharing the same settings, not
the same pool instance.
Instead use  to create a pool instance and share this.

See more in the docs
https://camel.apache.org/manual/threading-model.html

On Mon, Aug 28, 2023 at 9:53 AM Modanese, Riccardo
 wrote:

> Hello to everyone,
>  I’m working to place a cap to the threads used by my Camel routes.
>
> I have several routes (the number can vary time to time) and I created a
> thread pool profile like this one (Camel 3.x with SpringBoot 2.5.x)
>  id="myRoutePoolProfile"
> poolSize="…"
> maxPoolSize="…"
> maxQueueSize="…"
> rejectedPolicy="…"/>
>
> My routes have a recipient list and an error handler (no multicast).
> I set the same thread pool profile to each error handler and recipient
> list:
> executorServiceRef="myRoutePoolProfile"
>
> Looking at the thread instantiated by the JVM (since I noticed a huge
> quantity of memory allocated increasing which grows continuously for a
> while) I have seen recipient list allocating a new thread pool with the
> configuration I provided (myRoutePoolProfile) for each route instead of
> sharing it for all the routes.
>
> My needing is to limit the threads used by all the routes and Camel engine
> itself by setting a global limit.
> In this way I can be confident to have a predictable amount of memory used
> by the JVM (once setting heap, code caching and so on)
>
> Do you have any idea or documentation to point me to?
>
> Thank for your support.
>
> Regards,
> Riccardo Modanese
>


--
Claus Ibsen
-
@davsclaus
Camel in Action 2: https://www.manning.com/ibsen2


unable to work with streams + Artemis large messages with AMQP factory

2023-08-28 Thread Modanese, Riccardo
Hello everyone,
 I’m working on supporting in a proper way the Artemis large messages 
through Camel streams.
>From the documentation it looks like this is supported only using AMQP or CORE 
>protocol (no JMS).
We use a custom factory since we need to set the connection client ID.

I set up Camel routes adding streamCache="true" to every route definition and 
camel context definition.
I also added the stream cache configuration to each Camel context:



This configuration didn’t work using this connection factory:
org.apache.qpid.jms.JmsConnectionFactory
with this url:
public ServiceConnectionFactoryImpl(String host, int port, String 
username, String password, String clientId) {
   super(username, password, "amqp://" + host + ":" + port);
...
}
It looks like, from my understanding, that this factory is creating a JMS 
connection even if I specify the AMQP protocol in the connection url.
But this works (connect and subscribe and receive messages from the broker) 
using an Artemis connector with only AMQP protocol set (is the AMQP protocol 
acceptor also supporting JMS?)

Anyway, changing the factory to use CORE protocol routes work fine with the 
Camel configuration I set.
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
with this url
public ServiceConnectionFactoryImpl(String host, int port, String 
username, String password, String clientId) {
super("tcp://" + host + ":" + port, username, password);
...
}

What I’m doing wrong?
How can I configure my factory to force AMQP protocol to be used (since it’ 
supported with streams and Artemis large messages)

Any suggestion is appreciated!

Regards

Riccardo Modanese


Re: unable to work with streams + Artemis large messages with AMQP factory

2023-08-28 Thread Federico Mariani
Hello,

In camel-amqp component there are tests that uses Artemis amqp
https://github.com/apache/camel/blob/main/components/camel-amqp/src/test/java/org/apache/camel/component/amqp/artemis/AMQPEmbeddedBrokerTest.java#L44
and the connection is configured like this
https://github.com/apache/camel/blob/main/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPConnectionDetails.java#L64
the AMQPConnectionDetails is later used to configure a
org.apache.qpid.jms.JmsConnectionFactory.

If you have a reproducer I am happy to help more.

Regards,
Federico

Il giorno lun 28 ago 2023 alle ore 12:14 Modanese, Riccardo
 ha scritto:

> Hello everyone,
>  I’m working on supporting in a proper way the Artemis large messages
> through Camel streams.
> From the documentation it looks like this is supported only using AMQP or
> CORE protocol (no JMS).
> We use a custom factory since we need to set the connection client ID.
>
> I set up Camel routes adding streamCache="true" to every route definition
> and camel context definition.
> I also added the stream cache configuration to each Camel context:
>
>  spoolDirectory="/tmp/camel_cache" spoolThreshold="65536"/>
>
> This configuration didn’t work using this connection factory:
> org.apache.qpid.jms.JmsConnectionFactory
> with this url:
> public ServiceConnectionFactoryImpl(String host, int port, String
> username, String password, String clientId) {
>super(username, password, "amqp://" + host + ":" + port);
> ...
> }
> It looks like, from my understanding, that this factory is creating a JMS
> connection even if I specify the AMQP protocol in the connection url.
> But this works (connect and subscribe and receive messages from the
> broker) using an Artemis connector with only AMQP protocol set (is the AMQP
> protocol acceptor also supporting JMS?)
>
> Anyway, changing the factory to use CORE protocol routes work fine with
> the Camel configuration I set.
> org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
> with this url
> public ServiceConnectionFactoryImpl(String host, int port, String
> username, String password, String clientId) {
> super("tcp://" + host + ":" + port, username, password);
> ...
> }
>
> What I’m doing wrong?
> How can I configure my factory to force AMQP protocol to be used (since
> it’ supported with streams and Artemis large messages)
>
> Any suggestion is appreciated!
>
> Regards
>
> Riccardo Modanese
>


Re: How to use RoutePolicy to prevent a route from starting

2023-08-28 Thread jacek szymanski

Hi,

wouldn't it be enough to disable auto startup on the backup route?


https://camel.apache.org/manual/configuring-route-startup-ordering-and-autostartup.html


js.



On 28.08.2023 9:31, Han Yainsun wrote:

Hi Camel Community,

Greetings to you!

Camel version: 3.20.4
Spring Boot version: 2.7.11
JDK: Amazon Corretto 17
Platform: Windows server 2019
IDE: IntelliJ IDEA 2021.3.2 (Community Edition)
Stack Traces & logging output: attached at the end

I have a case that two routes use the same HTTP listening URL but different 
backend, one is mater and another is backup. Normally during the Camel start 
only master route should be started, the backup route should be in stop status. 
When the master route corresponding backend got issue we can manually start the 
backup route and stop the master route.

The problem is that during Camel startup it will try to start all routes and 
failed due to below error:

---
  Application run 
failed","logger_name":"org.springframework.boot.SpringApplication","thread_name":"main","level":"ERROR","level_value":4,"stack_trace":"org.apache.camel.FailedToStartRouteException:
 Failed to start route SMS_JSON_MsgFlow because of Multiple consumers for the same endpoint is not allowed: 
jetty:http://0.0.0.0:7806/esb00/helmjson?httpMethodRestrict=POST&matchOnUriPrefix=true
---

I read the Camel document and seems RoutePolicy[1] could be the solution, so I take class 
ThrottlingInflightRoutePolicy as reference and extend class RoutePolicySupport and 
override method onStart as below, however seems it not works. I can see it print the log 
"stop omni channel interface: " on console but the CamelContext still try to 
start the route and got issue.

Could you kindly advise on this? Thanks in advance!

---
 @Override
 public void onStart(Route route) {
 System.out.println("starting route: " + route.getRouteId());
 boolean startOmniChannel = 
Boolean.parseBoolean(String.valueOf(camelContext.getPropertiesComponent().resolveProperty("start_omni_channel")));
 if (!startOmniChannel) {
 try {
 System.out.println("stop omni channel interface: " + 
route.getRouteId());
 suspendOrStopConsumer(route.getConsumer());
 } catch (Exception e) {
 e.printStackTrace();
 }
 }
 }
---

Stack Traces & logging output:
---
{"@timestamp":"2023-08-28T10:58:07.52+08:00","@version":"1","message":"Apache Camel 3.20.4 (mini-esb) is 
starting","logger_name":"org.apache.camel.impl.engine.AbstractCamelContext","thread_name":"main","level":"INFO","level_value":2}
{"@timestamp":"2023-08-28T10:58:07.62+08:00","@version":"1","message":"Starting CamelMainRunController to ensure the main thread keeps 
running","logger_name":"org.apache.camel.spring.boot.CamelSpringBootApplicationListener","thread_name":"main","level":"INFO","level_value":2}
{"@timestamp":"2023-08-28T10:58:07.622+08:00","@version":"1","message":"Apache Camel (Main) 3.20.4 is 
starting","logger_name":"org.apache.camel.main.MainSupport","thread_name":"CamelMainRunController","level":"INFO","level_value":2}
{"@timestamp":"2023-08-28T10:58:07.642+08:00","@version":"1","message":"Logging initialized @5011ms to 
org.eclipse.jetty.util.log.Slf4jLog","logger_name":"org.eclipse.jetty.util.log","thread_name":"main","level":"INFO","level_value":2}
{"@timestamp":"2023-08-28T10:58:08.1+08:00","@version":"1","message":"jetty-9.4.51.v20230217; built: 2023-02-17T08:19:37.309Z; git: 
b45c405e4544384de066f814ed42ae3dceacdd49; jvm 
17.0.7+7-LTS","logger_name":"org.eclipse.jetty.server.Server","thread_name":"main","level":"INFO","level_value":2}
{"@timestamp":"2023-08-28T10:58:08.53+08:00","@version":"1","message":"Started 
o.e.j.s.ServletContextHandler@4c635edc{/,null,AVAILABLE}","logger_name":"org.eclipse.jetty.server.handler.ContextHandler","thread_name":"main","level":"INFO","level_value":2}
{"@timestamp":"2023-08-28T10:58:08.994+08:00","@version":"1","message":"Started ServerConnector@3009eed7{HTTP/1.1, 
(http/1.1)}{0.0.0.0:7806}","logger_name":"org.eclipse.jetty.server.AbstractConnector","thread_name":"main","level":"INFO","level_value":2}
{"@timestamp":"2023-08-28T10:58:08.994+08:00","@version":"1","message":"Started 
@6363ms","logger_name":"org.eclipse.jetty.server.Server","thread_name":"main","level":"INFO","level_value":2}
{"@timestamp":"2023-08-28T10:58:09.011+08:00","@version":"1","message":"Error starting CamelContext (mini-esb) due to exception thrown: Failed to start route SMS_JSON_MsgFlow because 
of Multiple consumers for the same endpoint is not allowed: 
jetty:http://0.0.0.0:7806/esb00/helmjson?httpMethodRestrict=POST&matchOnUriPrefix=true","logger_name":"org.apache.camel.impl.engine.AbstractCamelContext","thread_name":"main","level":"ERROR","level_value":4,"stack_trace":"org.apache.camel.FailedToStartRouteException:
 Failed to start route SMS_JSON_MsgFlow because of Multiple consumers for the 

Re: unable to work with streams + Artemis large messages with AMQP factory

2023-08-28 Thread Modanese, Riccardo
Thanks for your reply, I'm going to investigate the links you provided.

In the meanwhile, I can provide you few links to the code.
The factory I originally used was (amqp???)
https://github.com/eclipse/kapua/blob/develop/service/client/src/main/java/org/eclipse/kapua/service/client/amqp/ServiceConnectionFactoryImpl.java#L24
https://github.com/eclipse/kapua/blob/fix-largeMessages/consumer/telemetry-app/src/main/resources/spring/applicationContext.xml#L48

then I changed it to (core):
https://github.com/eclipse/kapua/blob/fix-largeMessages/service/client/src/main/java/org/eclipse/kapua/service/client/amqp/ServiceConnectionFactoryImpl.java#L24

Out of topic since involves Artemis but anyway, just to give you a little bit 
more context.
Clients are MQTT. I’m still unable to change the default minLargeMesaageSize. I 
tried adding this parameter to all the acceptors and to the AMQP/CORE factory 
but doesn’t work. Default value is kept.

Regards,

Riccardo


From: Federico Mariani 
Date: Monday, 28 August 2023 at 12:56
To: users@camel.apache.org 
Subject: Re: unable to work with streams + Artemis large messages with AMQP 
factory
Hello,

In camel-amqp component there are tests that uses Artemis amqp
https://github.com/apache/camel/blob/main/components/camel-amqp/src/test/java/org/apache/camel/component/amqp/artemis/AMQPEmbeddedBrokerTest.java#L44
and the connection is configured like this
https://github.com/apache/camel/blob/main/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPConnectionDetails.java#L64
the AMQPConnectionDetails is later used to configure a
org.apache.qpid.jms.JmsConnectionFactory.

If you have a reproducer I am happy to help more.

Regards,
Federico

Il giorno lun 28 ago 2023 alle ore 12:14 Modanese, Riccardo
 ha scritto:

> Hello everyone,
>  I’m working on supporting in a proper way the Artemis large messages
> through Camel streams.
> From the documentation it looks like this is supported only using AMQP or
> CORE protocol (no JMS).
> We use a custom factory since we need to set the connection client ID.
>
> I set up Camel routes adding streamCache="true" to every route definition
> and camel context definition.
> I also added the stream cache configuration to each Camel context:
>
>  spoolDirectory="/tmp/camel_cache" spoolThreshold="65536"/>
>
> This configuration didn’t work using this connection factory:
> org.apache.qpid.jms.JmsConnectionFactory
> with this url:
> public ServiceConnectionFactoryImpl(String host, int port, String
> username, String password, String clientId) {
>super(username, password, "amqp://" + host + ":" + port);
> ...
> }
> It looks like, from my understanding, that this factory is creating a JMS
> connection even if I specify the AMQP protocol in the connection url.
> But this works (connect and subscribe and receive messages from the
> broker) using an Artemis connector with only AMQP protocol set (is the AMQP
> protocol acceptor also supporting JMS?)
>
> Anyway, changing the factory to use CORE protocol routes work fine with
> the Camel configuration I set.
> org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
> with this url
> public ServiceConnectionFactoryImpl(String host, int port, String
> username, String password, String clientId) {
> super("tcp://" + host + ":" + port, username, password);
> ...
> }
>
> What I’m doing wrong?
> How can I configure my factory to force AMQP protocol to be used (since
> it’ supported with streams and Artemis large messages)
>
> Any suggestion is appreciated!
>
> Regards
>
> Riccardo Modanese
>


Re: unable to work with streams + Artemis large messages with AMQP factory

2023-08-28 Thread Federico Mariani
I am afraid that in order to use minLargeMessageSize you need to use
*camel-jms* component and *artemis-jms-client* as jms implementation,
because camel-amqp uses *qpid-jms-client.*If you have some time, you can
take a look to camel-jms
 tests
and artemis
test infra



Il giorno lun 28 ago 2023 alle ore 15:33 Modanese, Riccardo
 ha scritto:

> Thanks for your reply, I'm going to investigate the links you provided.
>
> In the meanwhile, I can provide you few links to the code.
> The factory I originally used was (amqp???)
>
> https://github.com/eclipse/kapua/blob/develop/service/client/src/main/java/org/eclipse/kapua/service/client/amqp/ServiceConnectionFactoryImpl.java#L24
>
> https://github.com/eclipse/kapua/blob/fix-largeMessages/consumer/telemetry-app/src/main/resources/spring/applicationContext.xml#L48
>
> then I changed it to (core):
>
> https://github.com/eclipse/kapua/blob/fix-largeMessages/service/client/src/main/java/org/eclipse/kapua/service/client/amqp/ServiceConnectionFactoryImpl.java#L24
>
> Out of topic since involves Artemis but anyway, just to give you a little
> bit more context.
> Clients are MQTT. I’m still unable to change the default
> minLargeMesaageSize. I tried adding this parameter to all the acceptors and
> to the AMQP/CORE factory but doesn’t work. Default value is kept.
>
> Regards,
>
> Riccardo
>
>
> From: Federico Mariani 
> Date: Monday, 28 August 2023 at 12:56
> To: users@camel.apache.org 
> Subject: Re: unable to work with streams + Artemis large messages with
> AMQP factory
> Hello,
>
> In camel-amqp component there are tests that uses Artemis amqp
>
> https://github.com/apache/camel/blob/main/components/camel-amqp/src/test/java/org/apache/camel/component/amqp/artemis/AMQPEmbeddedBrokerTest.java#L44
> and the connection is configured like this
>
> https://github.com/apache/camel/blob/main/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPConnectionDetails.java#L64
> the AMQPConnectionDetails is later used to configure a
> org.apache.qpid.jms.JmsConnectionFactory.
>
> If you have a reproducer I am happy to help more.
>
> Regards,
> Federico
>
> Il giorno lun 28 ago 2023 alle ore 12:14 Modanese, Riccardo
>  ha scritto:
>
> > Hello everyone,
> >  I’m working on supporting in a proper way the Artemis large messages
> > through Camel streams.
> > From the documentation it looks like this is supported only using AMQP or
> > CORE protocol (no JMS).
> > We use a custom factory since we need to set the connection client ID.
> >
> > I set up Camel routes adding streamCache="true" to every route definition
> > and camel context definition.
> > I also added the stream cache configuration to each Camel context:
> >
> >  > spoolDirectory="/tmp/camel_cache" spoolThreshold="65536"/>
> >
> > This configuration didn’t work using this connection factory:
> > org.apache.qpid.jms.JmsConnectionFactory
> > with this url:
> > public ServiceConnectionFactoryImpl(String host, int port, String
> > username, String password, String clientId) {
> >super(username, password, "amqp://" + host + ":" + port);
> > ...
> > }
> > It looks like, from my understanding, that this factory is creating a JMS
> > connection even if I specify the AMQP protocol in the connection url.
> > But this works (connect and subscribe and receive messages from the
> > broker) using an Artemis connector with only AMQP protocol set (is the
> AMQP
> > protocol acceptor also supporting JMS?)
> >
> > Anyway, changing the factory to use CORE protocol routes work fine with
> > the Camel configuration I set.
> > org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
> > with this url
> > public ServiceConnectionFactoryImpl(String host, int port, String
> > username, String password, String clientId) {
> > super("tcp://" + host + ":" + port, username, password);
> > ...
> > }
> >
> > What I’m doing wrong?
> > How can I configure my factory to force AMQP protocol to be used (since
> > it’ supported with streams and Artemis large messages)
> >
> > Any suggestion is appreciated!
> >
> > Regards
> >
> > Riccardo Modanese
> >
>


Re: unable to work with streams + Artemis large messages with AMQP factory

2023-08-28 Thread Modanese, Riccardo
Sorry, I’m a bit confused!

To be able to set minLargeMessage should I use *camel-jms*? There’s no other 
way, right?

My implementation injects a factory into *camel-amqp* endpoint so I supposed to 
be able to change the connection driver without affecting *camel-amqp* 
functionalities.
But from my understanding I was wrong.
With this configuration worked but I couldn’t change the minLargeMessageSize 
and it’s mandatory to have it configurable in my case.

I opted for amqp protocol because thought it’s a preferable choice for an 
opensource project compared to JMS or CORE.

But if there is no other way, as I’m understanding, I need to switch to JMS.

I’ll try this way soon!



From: Federico Mariani 
Date: Monday, 28 August 2023 at 16:08
To: users@camel.apache.org 
Subject: Re: unable to work with streams + Artemis large messages with AMQP 
factory
I am afraid that in order to use minLargeMessageSize you need to use
*camel-jms* component and *artemis-jms-client* as jms implementation,
because camel-amqp uses *qpid-jms-client.*If you have some time, you can
take a look to camel-jms
 tests
and artemis
test infra



Il giorno lun 28 ago 2023 alle ore 15:33 Modanese, Riccardo
 ha scritto:

> Thanks for your reply, I'm going to investigate the links you provided.
>
> In the meanwhile, I can provide you few links to the code.
> The factory I originally used was (amqp???)
>
> https://github.com/eclipse/kapua/blob/develop/service/client/src/main/java/org/eclipse/kapua/service/client/amqp/ServiceConnectionFactoryImpl.java#L24
>
> https://github.com/eclipse/kapua/blob/fix-largeMessages/consumer/telemetry-app/src/main/resources/spring/applicationContext.xml#L48
>
> then I changed it to (core):
>
> https://github.com/eclipse/kapua/blob/fix-largeMessages/service/client/src/main/java/org/eclipse/kapua/service/client/amqp/ServiceConnectionFactoryImpl.java#L24
>
> Out of topic since involves Artemis but anyway, just to give you a little
> bit more context.
> Clients are MQTT. I’m still unable to change the default
> minLargeMesaageSize. I tried adding this parameter to all the acceptors and
> to the AMQP/CORE factory but doesn’t work. Default value is kept.
>
> Regards,
>
> Riccardo
>
>
> From: Federico Mariani 
> Date: Monday, 28 August 2023 at 12:56
> To: users@camel.apache.org 
> Subject: Re: unable to work with streams + Artemis large messages with
> AMQP factory
> Hello,
>
> In camel-amqp component there are tests that uses Artemis amqp
>
> https://github.com/apache/camel/blob/main/components/camel-amqp/src/test/java/org/apache/camel/component/amqp/artemis/AMQPEmbeddedBrokerTest.java#L44
> and the connection is configured like this
>
> https://github.com/apache/camel/blob/main/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPConnectionDetails.java#L64
> the AMQPConnectionDetails is later used to configure a
> org.apache.qpid.jms.JmsConnectionFactory.
>
> If you have a reproducer I am happy to help more.
>
> Regards,
> Federico
>
> Il giorno lun 28 ago 2023 alle ore 12:14 Modanese, Riccardo
>  ha scritto:
>
> > Hello everyone,
> >  I’m working on supporting in a proper way the Artemis large messages
> > through Camel streams.
> > From the documentation it looks like this is supported only using AMQP or
> > CORE protocol (no JMS).
> > We use a custom factory since we need to set the connection client ID.
> >
> > I set up Camel routes adding streamCache="true" to every route definition
> > and camel context definition.
> > I also added the stream cache configuration to each Camel context:
> >
> >  > spoolDirectory="/tmp/camel_cache" spoolThreshold="65536"/>
> >
> > This configuration didn’t work using this connection factory:
> > org.apache.qpid.jms.JmsConnectionFactory
> > with this url:
> > public ServiceConnectionFactoryImpl(String host, int port, String
> > username, String password, String clientId) {
> >super(username, password, "amqp://" + host + ":" + port);
> > ...
> > }
> > It looks like, from my understanding, that this factory is creating a JMS
> > connection even if I specify the AMQP protocol in the connection url.
> > But this works (connect and subscribe and receive messages from the
> > broker) using an Artemis connector with only AMQP protocol set (is the
> AMQP
> > protocol acceptor also supporting JMS?)
> >
> > Anyway, changing the factory to use CORE protocol routes work fine with
> > the Camel configuration I set.
> > org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
> > with this url
> > public ServiceConnectionFactoryImpl(String host, int port, String
> > username, String password, String clientId) {
> > super("tcp://" + host + ":" + port, username, password);
> > ...
> > }
> >
> > What I’m doing wrong?
> > How can I co

Registration open for Community Over Code North America

2023-08-28 Thread Rich Bowen
Hello! Registration is still open for the upcoming Community Over Code
NA event in Halifax, NS! We invite you to  register for the event
https://communityovercode.org/registration/

Apache Committers, note that you have a special discounted rate for the
conference at US$250. To take advantage of this rate, use the special
code sent to the committers@ list by Brian Proffitt earlier this month.

If you are in need of an invitation letter, please consult the
information at https://communityovercode.org/visa-letter/

Please see https://communityovercode.org/ for more information about
the event, including how to make reservations for discounted hotel
rooms in Halifax. Discounted rates will only be available until Sept.
5, so reserve soon!

--Rich, for the event planning team


Re: Camel 4.0 upgrade causes SEDA Hang

2023-08-28 Thread Otavio Rodolfo Piske
Hi, I'd like to investigate ... but for that I need a full reproducer. Can
you provide* one, please?

* Either as a test in the Camel Core code base or one that I can quickly
clone and run.

Thanks

On Thu, Aug 24, 2023 at 6:54 PM Tim Janusz  wrote:

> Hi all,
>
> We're currently trying to upgrade our Camel from 3.20.2, spring boot 2.7.12
> running on Java 11 to spring.boot.version=3.1.2 and camel.version=4.0.0
> running on Java 17.
>
> We are seeing weird behaviour in one of our applications where our SEDA
> component just 'hangs' after a specific set of interactions involving
> dynamically adding/removing routes via the dynamicRouter feature.
>
> Some notes:
>  - We use SEDA component (where things seem to hang)
>  - flow is basically:
>   initialize camel,
>   process msg1 to create dynamic route and process message on it,
>   process msg2 which deletes route,
>   process msg3 which re-creates route and processes it,
>   process msg4 which deletes route,
>   process msg5 which re-creates route and HANGS at this point
>  - Basically the 2nd time we delete the route and re-create again (which
> has the same name as before) is when we see the 'hang' for processing
> (super weird)
>  - This only happens in version 4.x same code works fine in 3.x
>
> We're really wondering what could've changed between version 3.x and 4.x
> that causes this now to hang?
>
> I've included some sudo code below with log snippets to help clarify
>
> Our main RouteBuilder creates these two
> from("servlet:dialcommand")
>   .routeId("dialCommandServletRoute")
>   .convertBodyTo(String::class.java)
>   .to("seda:dialcommandqueue?timeout=$commandTimeout")
>   .removeHeader("args")
>
>
>
> from("seda:dialcommandqueue?concurrentConsumers=$concurrentConsumerCount&timeout=$commandTimeout")
>   .setExchangePattern(ExchangePattern.InOut)
>   .routeId("dialcommandprocess")
>   .dynamicRouter { it: Exchange -> dialDynamicRouting(it) }
>
> Our dynamic router logic is like this:
> fun dialDynamicRouting(exchange: Exchange): String? {
>
>  // when we need to kill a route
>is KillRoute -> {
>  context.routeController.stopRoute(routeId)
>  context.removeRoute(routeId)
>
> exchange.`in`.body = success("OK", exchange.`in`.getHeader("id",
> String::class.java))
>  return null
>
>  // when we add a new route dynamically
> val dialCommandRoute = createDialCommandRoute(routeId, contextId,
> contextKey, concurrency)
> // add to camel context
> context.addRoutes(dialCommandRoute)
> context.getRoute(routeId).properties["contextId"] = contextId
> return dialCommandRoute.from
>
> The "createDialCommandRoute" logic dynamic router created uses this type of
> code
> from(from)
>   .routeId(routeId)
>   .process {
> val message = it.`in`
> val body = message.body
> if (body is DialCommand<*>) {
>   body.id = message.getHeader("id", String::class.java) ?:
> body.standardOptions.id ?: body.id
>   body.configureLogContext()
> }
>   }.id("[$routeId] prepare command object")
>   .log(LoggingLevel.INFO, "[\${headers.id}] \${body.procName}")
>   .process { it: Exchange -> it.`in`.body = run(it) }
>   }
>
> With DEBUG level logging enabled we can see where it hangs is basically
> right between when it would normally
> process items off that newly created route.
>
> Log Sample 1: Example logs of a valid run where it creates the new route
> and start processing it
> 2023-08-22  INFO 54168 --- [ialcommandqueue]
> c.g.dm.automation.etl.route.Operator : Route dialCommandRoute-6fddd8cf
> does not exist. This command (RecoverUnfinishedOperations) will create the
> route.
> 2023-08-22  INFO 54168 --- [ialcommandqueue]
> c.g.dm.automation.etl.route.Operator : Creating route:
> dialCommandRoute-6fddd8cf
> 2023-08-22 DEBUG 54168 --- [ialcommandqueue]
> o.a.c.impl.engine.AbstractCamelContext   :
> seda://dialcommand:6fddd8cf?concurrentConsumers=8&timeout=0 converted to
> endpoint: seda://dialcommand:6fddd8cf?concurrentConsumers=8&timeout=0 by
> component: org.apache.camel.component.seda.SedaComponent@4a23350
> 2023-08-22 DEBUG 54168 --- [ialcommandqueue]
> o.a.c.i.e.InternalRouteStartupManager: Warming up route id:
> dialCommandRoute-6fddd8cf having autoStartup=true
> 2023-08-22 DEBUG 54168 --- [ialcommandqueue]
> o.a.c.i.e.InternalRouteStartupManager: Route: dialCommandRoute-6fddd8cf
> >>> Route[seda://dialcommand:6fddd8cf?concurrentConsumers=8&timeout=0 ->
> null]
> 2023-08-22 DEBUG 54168 --- [ialcommandqueue]
> o.a.c.i.e.InternalRouteStartupManager: Starting consumer (order: 1004)
> on route: dialCommandRoute-6fddd8cf
> 2023-08-22 DEBUG 54168 --- [ialcommandqueue]
> o.a.c.i.e.BaseExecutorServiceManager : Created new ThreadPool for
> source:
> Consumer[seda://dialcommand:6fddd8cf?concurrentConsumers=8&timeout=0] with
> name: seda://dialcommand:6fddd8cf?concurrentConsumers=8&timeout=0. ->
> org.apache.camel.util.concurren

Re: Camel 4.0 upgrade causes SEDA Hang

2023-08-28 Thread Tim Janusz
Thanks so much..

Give me a little time and I'll create a smaller app to demonstrate what
we're seeing..

I'll reply once I have something available.


On Mon, Aug 28, 2023 at 4:30 PM Otavio Rodolfo Piske 
wrote:

> Hi, I'd like to investigate ... but for that I need a full reproducer. Can
> you provide* one, please?
>
> * Either as a test in the Camel Core code base or one that I can quickly
> clone and run.
>
> Thanks
>
> On Thu, Aug 24, 2023 at 6:54 PM Tim Janusz  wrote:
>
> > Hi all,
> >
> > We're currently trying to upgrade our Camel from 3.20.2, spring boot
> 2.7.12
> > running on Java 11 to spring.boot.version=3.1.2 and camel.version=4.0.0
> > running on Java 17.
> >
> > We are seeing weird behaviour in one of our applications where our SEDA
> > component just 'hangs' after a specific set of interactions involving
> > dynamically adding/removing routes via the dynamicRouter feature.
> >
> > Some notes:
> >  - We use SEDA component (where things seem to hang)
> >  - flow is basically:
> >   initialize camel,
> >   process msg1 to create dynamic route and process message on it,
> >   process msg2 which deletes route,
> >   process msg3 which re-creates route and processes it,
> >   process msg4 which deletes route,
> >   process msg5 which re-creates route and HANGS at this point
> >  - Basically the 2nd time we delete the route and re-create again (which
> > has the same name as before) is when we see the 'hang' for processing
> > (super weird)
> >  - This only happens in version 4.x same code works fine in 3.x
> >
> > We're really wondering what could've changed between version 3.x and 4.x
> > that causes this now to hang?
> >
> > I've included some sudo code below with log snippets to help clarify
> >
> > Our main RouteBuilder creates these two
> > from("servlet:dialcommand")
> >   .routeId("dialCommandServletRoute")
> >   .convertBodyTo(String::class.java)
> >   .to("seda:dialcommandqueue?timeout=$commandTimeout")
> >   .removeHeader("args")
> >
> >
> >
> >
> from("seda:dialcommandqueue?concurrentConsumers=$concurrentConsumerCount&timeout=$commandTimeout")
> >   .setExchangePattern(ExchangePattern.InOut)
> >   .routeId("dialcommandprocess")
> >   .dynamicRouter { it: Exchange -> dialDynamicRouting(it) }
> >
> > Our dynamic router logic is like this:
> > fun dialDynamicRouting(exchange: Exchange): String? {
> >
> >  // when we need to kill a route
> >is KillRoute -> {
> >  context.routeController.stopRoute(routeId)
> >  context.removeRoute(routeId)
> >
> > exchange.`in`.body = success("OK", exchange.`in`.getHeader("id",
> > String::class.java))
> >  return null
> >
> >  // when we add a new route dynamically
> > val dialCommandRoute = createDialCommandRoute(routeId, contextId,
> > contextKey, concurrency)
> > // add to camel context
> > context.addRoutes(dialCommandRoute)
> > context.getRoute(routeId).properties["contextId"] = contextId
> > return dialCommandRoute.from
> >
> > The "createDialCommandRoute" logic dynamic router created uses this type
> of
> > code
> > from(from)
> >   .routeId(routeId)
> >   .process {
> > val message = it.`in`
> > val body = message.body
> > if (body is DialCommand<*>) {
> >   body.id = message.getHeader("id", String::class.java) ?:
> > body.standardOptions.id ?: body.id
> >   body.configureLogContext()
> > }
> >   }.id("[$routeId] prepare command object")
> >   .log(LoggingLevel.INFO, "[\${headers.id}] \${body.procName}")
> >   .process { it: Exchange -> it.`in`.body = run(it) }
> >   }
> >
> > With DEBUG level logging enabled we can see where it hangs is basically
> > right between when it would normally
> > process items off that newly created route.
> >
> > Log Sample 1: Example logs of a valid run where it creates the new route
> > and start processing it
> > 2023-08-22  INFO 54168 --- [ialcommandqueue]
> > c.g.dm.automation.etl.route.Operator : Route
> dialCommandRoute-6fddd8cf
> > does not exist. This command (RecoverUnfinishedOperations) will create
> the
> > route.
> > 2023-08-22  INFO 54168 --- [ialcommandqueue]
> > c.g.dm.automation.etl.route.Operator : Creating route:
> > dialCommandRoute-6fddd8cf
> > 2023-08-22 DEBUG 54168 --- [ialcommandqueue]
> > o.a.c.impl.engine.AbstractCamelContext   :
> > seda://dialcommand:6fddd8cf?concurrentConsumers=8&timeout=0 converted to
> > endpoint: seda://dialcommand:6fddd8cf?concurrentConsumers=8&timeout=0 by
> > component: org.apache.camel.component.seda.SedaComponent@4a23350
> > 2023-08-22 DEBUG 54168 --- [ialcommandqueue]
> > o.a.c.i.e.InternalRouteStartupManager: Warming up route id:
> > dialCommandRoute-6fddd8cf having autoStartup=true
> > 2023-08-22 DEBUG 54168 --- [ialcommandqueue]
> > o.a.c.i.e.InternalRouteStartupManager: Route:
> dialCommandRoute-6fddd8cf
> > >>> Route[seda://dialcommand:6fddd8cf?concurrentConsumers=8&timeout=0 ->
> > null]
> > 2023-08-22 DEBUG 

Re: How to upload data to S3 as Multipart file while getting data in loopDoWhile EIP from Paginated API

2023-08-28 Thread Claudio Miranda
> I want to upload all this data  received from all pages of paginated API to
> be uploaded as single file in s3 (Multipart file).

Camel S3 supports multipart upload to S3
https://camel.apache.org/components/3.21.x/aws2-s3-component.html#_s3_producer_operation_examples


-- 
  Claudio Miranda

clau...@claudius.com.br
http://www.claudius.com.br


RE: How to use RoutePolicy to prevent a route from starting

2023-08-28 Thread Han Yainsun
Hi Jacek,

Thanks for the advice! Yes this feature meet my needs.

Sent from Mail for Windows 10

From: jacek szymanski
Sent: 2023年8月28日 20:29
To: users@camel.apache.org
Subject: Re: How to use RoutePolicy to prevent a route from starting

Hi,

wouldn't it be enough to disable auto startup on the backup route?


https://camel.apache.org/manual/configuring-route-startup-ordering-and-autostartup.html


js.



On 28.08.2023 9:31, Han Yainsun wrote:
> Hi Camel Community,
>
> Greetings to you!
>
> Camel version: 3.20.4
> Spring Boot version: 2.7.11
> JDK: Amazon Corretto 17
> Platform: Windows server 2019
> IDE: IntelliJ IDEA 2021.3.2 (Community Edition)
> Stack Traces & logging output: attached at the end
>
> I have a case that two routes use the same HTTP listening URL but different 
> backend, one is mater and another is backup. Normally during the Camel start 
> only master route should be started, the backup route should be in stop 
> status. When the master route corresponding backend got issue we can manually 
> start the backup route and stop the master route.
>
> The problem is that during Camel startup it will try to start all routes and 
> failed due to below error:
>
> ---
>   Application run 
> failed","logger_name":"org.springframework.boot.SpringApplication","thread_name":"main","level":"ERROR","level_value":4,"stack_trace":"org.apache.camel.FailedToStartRouteException:
>  Failed to start route SMS_JSON_MsgFlow because of Multiple consumers for the 
> same endpoint is not allowed: 
> jetty:http://0.0.0.0:7806/esb00/helmjson?httpMethodRestrict=POST&matchOnUriPrefix=true
> ---
>
> I read the Camel document and seems RoutePolicy[1] could be the solution, so 
> I take class ThrottlingInflightRoutePolicy as reference and extend class 
> RoutePolicySupport and override method onStart as below, however seems it not 
> works. I can see it print the log "stop omni channel interface: " on console 
> but the CamelContext still try to start the route and got issue.
>
> Could you kindly advise on this? Thanks in advance!
>
> ---
>  @Override
>  public void onStart(Route route) {
>  System.out.println("starting route: " + route.getRouteId());
>  boolean startOmniChannel = 
> Boolean.parseBoolean(String.valueOf(camelContext.getPropertiesComponent().resolveProperty("start_omni_channel")));
>  if (!startOmniChannel) {
>  try {
>  System.out.println("stop omni channel interface: " + 
> route.getRouteId());
>  suspendOrStopConsumer(route.getConsumer());
>  } catch (Exception e) {
>  e.printStackTrace();
>  }
>  }
>  }
> ---
>
> Stack Traces & logging output:
> ---
> {"@timestamp":"2023-08-28T10:58:07.52+08:00","@version":"1","message":"Apache 
> Camel 3.20.4 (mini-esb) is 
> starting","logger_name":"org.apache.camel.impl.engine.AbstractCamelContext","thread_name":"main","level":"INFO","level_value":2}
> {"@timestamp":"2023-08-28T10:58:07.62+08:00","@version":"1","message":"Starting
>  CamelMainRunController to ensure the main thread keeps 
> running","logger_name":"org.apache.camel.spring.boot.CamelSpringBootApplicationListener","thread_name":"main","level":"INFO","level_value":2}
> {"@timestamp":"2023-08-28T10:58:07.622+08:00","@version":"1","message":"Apache
>  Camel (Main) 3.20.4 is 
> starting","logger_name":"org.apache.camel.main.MainSupport","thread_name":"CamelMainRunController","level":"INFO","level_value":2}
> {"@timestamp":"2023-08-28T10:58:07.642+08:00","@version":"1","message":"Logging
>  initialized @5011ms to 
> org.eclipse.jetty.util.log.Slf4jLog","logger_name":"org.eclipse.jetty.util.log","thread_name":"main","level":"INFO","level_value":2}
> {"@timestamp":"2023-08-28T10:58:08.1+08:00","@version":"1","message":"jetty-9.4.51.v20230217;
>  built: 2023-02-17T08:19:37.309Z; git: 
> b45c405e4544384de066f814ed42ae3dceacdd49; jvm 
> 17.0.7+7-LTS","logger_name":"org.eclipse.jetty.server.Server","thread_name":"main","level":"INFO","level_value":2}
> {"@timestamp":"2023-08-28T10:58:08.53+08:00","@version":"1","message":"Started
>  
> o.e.j.s.ServletContextHandler@4c635edc{/,null,AVAILABLE}","logger_name":"org.eclipse.jetty.server.handler.ContextHandler","thread_name":"main","level":"INFO","level_value":2}
> {"@timestamp":"2023-08-28T10:58:08.994+08:00","@version":"1","message":"Started
>  ServerConnector@3009eed7{HTTP/1.1, 
> (http/1.1)}{0.0.0.0:7806}","logger_name":"org.eclipse.jetty.server.AbstractConnector","thread_name":"main","level":"INFO","level_value":2}
> {"@timestamp":"2023-08-28T10:58:08.994+08:00","@version":"1","message":"Started
>  
> @6363ms","logger_name":"org.eclipse.jetty.server.Server","thread_name":"main","level":"INFO","level_value":2}
> {"@timestamp":"2023-08-28T10:58:09.011+08:00","@version":"1","message":"Error 

How to use component Flink in Karavan?

2023-08-28 Thread Xu Bill
Hello,

I am trying to make a Camel integration in Karavan for communicating with 
Apache Flink.

I have read the documents here 
(https://camel.apache.org/components/4.0.x/flink-component.html).
There are some code sample for DataSet Callback and others.
But I don't know how to do these things in Karavan.
Could you help give some hints or samples (of Karavan)?

I am using VS Code and Camel extension to develop Camel integrations.

Thanks!
Best regards,
Bill