This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 87c8e832e9f remote vs local counter to better observe how many
internal and external messages is being processed (#14617)
87c8e832e9f is described below
commit 87c8e832e9f73ae1f33867085f82b9303bb69939
Author: Claus Ibsen <[email protected]>
AuthorDate: Sat Jun 22 15:34:59 2024 +0200
remote vs local counter to better observe how many internal and external
messages is being processed (#14617)
CAMEL-20879: camel-core: remote vs local endpoints counters
---
.../org/apache/camel/spi/InflightRepository.java | 5 ++
.../impl/engine/DefaultInflightRepository.java | 8 +++
.../camel/impl/console/ConsumerDevConsole.java | 10 ++--
.../camel/impl/console/ContextDevConsole.java | 18 ++++---
.../camel/impl/console/EndpointDevConsole.java | 6 ++-
.../apache/camel/impl/console/InflightConsole.java | 6 ++-
.../apache/camel/impl/console/RouteDevConsole.java | 2 +
.../management/mbean/ManagedCamelContextMBean.java | 12 +++++
.../api/management/mbean/ManagedConsumerMBean.java | 3 ++
.../api/management/mbean/ManagedEndpointMBean.java | 2 +-
.../api/management/mbean/ManagedProducerMBean.java | 3 ++
.../api/management/mbean/ManagedRouteMBean.java | 3 ++
.../management/mbean/ManagedCamelContext.java | 63 ++++++++++++++++++++++
.../camel/management/mbean/ManagedConsumer.java | 5 ++
.../camel/management/mbean/ManagedProducer.java | 4 ++
.../camel/management/mbean/ManagedRoute.java | 8 +++
.../core/commands/process/CamelContextStatus.java | 44 +++++++++++++--
.../core/commands/process/CamelRouteStatus.java | 23 ++++++--
.../jbang/core/commands/process/CamelRouteTop.java | 5 +-
.../jbang/core/commands/process/ListInflight.java | 15 ++++++
.../jbang/core/commands/process/ListProcess.java | 44 +++++++++++++--
21 files changed, 260 insertions(+), 29 deletions(-)
diff --git
a/core/camel-api/src/main/java/org/apache/camel/spi/InflightRepository.java
b/core/camel-api/src/main/java/org/apache/camel/spi/InflightRepository.java
index 9c837ad8a92..a4704984055 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/InflightRepository.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/InflightRepository.java
@@ -58,6 +58,11 @@ public interface InflightRepository extends StaticService {
*/
String getFromRouteId();
+ /**
+ * Whether the endpoint is remote where the exchange originates
(started)
+ */
+ boolean isFromRemoteEndpoint();
+
/**
* The id of the route where the exchange currently is being processed
* <p/>
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java
index 53cd68e0187..c606fb47730 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java
@@ -261,6 +261,14 @@ public class DefaultInflightRepository extends
ServiceSupport implements Infligh
return exchange.getFromRouteId();
}
+ @Override
+ public boolean isFromRemoteEndpoint() {
+ if (exchange.getFromEndpoint() != null) {
+ return exchange.getFromEndpoint().isRemote();
+ }
+ return false;
+ }
+
@Override
public String getAtRouteId() {
return ExchangeHelper.getAtRouteId(exchange);
diff --git
a/core/camel-console/src/main/java/org/apache/camel/impl/console/ConsumerDevConsole.java
b/core/camel-console/src/main/java/org/apache/camel/impl/console/ConsumerDevConsole.java
index 858edf525a8..8ded7df6a0d 100644
---
a/core/camel-console/src/main/java/org/apache/camel/impl/console/ConsumerDevConsole.java
+++
b/core/camel-console/src/main/java/org/apache/camel/impl/console/ConsumerDevConsole.java
@@ -63,6 +63,8 @@ public class ConsumerDevConsole extends AbstractDevConsole {
sb.append(String.format("\n Uri: %s",
mc.getEndpointUri()));
sb.append(String.format("\n State: %s", mc.getState()));
sb.append(String.format("\n Class: %s",
mc.getServiceType()));
+ sb.append(String.format("\n Remote: %b",
mc.isRemoteEndpoint()));
+ sb.append(String.format("\n Hosted: %b",
mc.isHostedService()));
sb.append(String.format("\n Inflight: %d", inflight));
if (mcc instanceof ManagedSchedulePollConsumerMBean mpc) {
sb.append(String.format("\n Polling: %s",
mpc.isPolling()));
@@ -74,9 +76,9 @@ public class ConsumerDevConsole extends AbstractDevConsole {
sb.append(String.format("\n Greedy: %s",
mpc.isGreedy()));
sb.append(String.format("\n Running Logging Level:
%s", mpc.getRunningLoggingLevel()));
sb.append(String.format("\n Send Empty Message When
Idle: %s", mpc.isSendEmptyMessageWhenIdle()));
- sb.append(String.format("\n Counter(total: %d
success: %d error: %d)",
+ sb.append(String.format("\n Counter (total: %d
success: %d error: %d)",
mpc.getCounter(), mpc.getSuccessCounter(),
mpc.getErrorCounter()));
- sb.append(String.format("\n Delay(initial: %d
delay: %d unit: %s)",
+ sb.append(String.format("\n Delay (initial: %d
delay: %d unit: %s)",
mpc.getInitialDelay(), mpc.getDelay(),
mpc.getTimeUnit()));
sb.append(String.format(
"\n Backoff(counter: %d multiplier: %d
errorThreshold: %d, idleThreshold: %d )",
@@ -113,7 +115,7 @@ public class ConsumerDevConsole extends AbstractDevConsole {
sb.append(String.format("\n Repeat
Count: %s", repeatCount));
}
sb.append(String.format("\n Running Logging
Level: %s", runLoggingLevel));
- sb.append(String.format("\n Counter(total:
%s)", counter));
+ sb.append(String.format("\n Counter (total:
%s)", counter));
}
} catch (Exception e) {
@@ -150,6 +152,8 @@ public class ConsumerDevConsole extends AbstractDevConsole {
jo.put("uri", mc.getEndpointUri());
jo.put("state", mc.getState());
jo.put("class", mc.getServiceType());
+ jo.put("remote", mc.isRemoteEndpoint());
+ jo.put("hosted", mc.isHostedService());
jo.put("inflight", inflight);
jo.put("scheduled", false);
if (mcc instanceof ManagedSchedulePollConsumerMBean mpc) {
diff --git
a/core/camel-console/src/main/java/org/apache/camel/impl/console/ContextDevConsole.java
b/core/camel-console/src/main/java/org/apache/camel/impl/console/ContextDevConsole.java
index a4bbdc3f395..5981c5d7a8d 100644
---
a/core/camel-console/src/main/java/org/apache/camel/impl/console/ContextDevConsole.java
+++
b/core/camel-console/src/main/java/org/apache/camel/impl/console/ContextDevConsole.java
@@ -40,12 +40,13 @@ public class ContextDevConsole extends AbstractDevConsole {
protected String doCallText(Map<String, Object> options) {
StringBuilder sb = new StringBuilder();
- sb.append(String.format("Apache Camel %s %s (%s) uptime %s",
getCamelContext().getVersion(),
- getCamelContext().getStatus().name().toLowerCase(Locale.ROOT),
getCamelContext().getName(),
- CamelContextHelper.getUptime(getCamelContext())));
+ String profile = "";
if (getCamelContext().getCamelContextExtension().getProfile() != null)
{
- sb.append(String.format("\n Profile: %s",
getCamelContext().getCamelContextExtension().getProfile()));
+ profile = " (profile: " +
getCamelContext().getCamelContextExtension().getProfile() + ")";
}
+ sb.append(String.format("Apache Camel %s %s (%s)%s uptime %s",
getCamelContext().getVersion(),
+ getCamelContext().getStatus().name().toLowerCase(Locale.ROOT),
getCamelContext().getName(),
+ profile, CamelContextHelper.getUptime(getCamelContext())));
if (getCamelContext().getDescription() != null) {
sb.append(String.format("\n %s",
getCamelContext().getDescription()));
}
@@ -70,9 +71,9 @@ public class ContextDevConsole extends AbstractDevConsole {
if (!thp.isEmpty()) {
sb.append(String.format("\n Messages/Sec: %s", thp));
}
- sb.append(String.format("\n Total: %s",
mb.getExchangesTotal()));
- sb.append(String.format("\n Failed: %s",
mb.getExchangesFailed()));
- sb.append(String.format("\n Inflight: %s",
mb.getExchangesInflight()));
+ sb.append(String.format("\n Total: %s/%s",
mb.getRemoteExchangesTotal(), mb.getExchangesTotal()));
+ sb.append(String.format("\n Failed: %s/%s",
mb.getRemoteExchangesFailed(), mb.getExchangesFailed()));
+ sb.append(String.format("\n Inflight: %s/%s",
mb.getRemoteExchangesInflight(), mb.getExchangesInflight()));
long idle = mb.getIdleSince();
if (idle > 0) {
sb.append(String.format("\n Idle Since: %s",
TimeUtils.printDuration(idle)));
@@ -151,6 +152,9 @@ public class ContextDevConsole extends AbstractDevConsole {
stats.put("exchangesTotal", mb.getExchangesTotal());
stats.put("exchangesFailed", mb.getExchangesFailed());
stats.put("exchangesInflight", mb.getExchangesInflight());
+ stats.put("remoteExchangesTotal",
mb.getRemoteExchangesTotal());
+ stats.put("remoteExchangesFailed",
mb.getRemoteExchangesFailed());
+ stats.put("remoteExchangesInflight",
mb.getRemoteExchangesInflight());
stats.put("reloaded", reloaded);
stats.put("meanProcessingTime", mb.getMeanProcessingTime());
stats.put("maxProcessingTime", mb.getMaxProcessingTime());
diff --git
a/core/camel-console/src/main/java/org/apache/camel/impl/console/EndpointDevConsole.java
b/core/camel-console/src/main/java/org/apache/camel/impl/console/EndpointDevConsole.java
index 9f5331f4cf6..6475ec32ff0 100644
---
a/core/camel-console/src/main/java/org/apache/camel/impl/console/EndpointDevConsole.java
+++
b/core/camel-console/src/main/java/org/apache/camel/impl/console/EndpointDevConsole.java
@@ -54,6 +54,7 @@ public class EndpointDevConsole extends AbstractDevConsole {
if (!col.isEmpty()) {
for (Endpoint e : col) {
boolean stub =
e.getComponent().getClass().getSimpleName().equals("StubComponent");
+ boolean remote = e.isRemote();
String uri = e.toString();
if (!uri.startsWith("stub:") && stub) {
// shadow-stub
@@ -62,9 +63,10 @@ public class EndpointDevConsole extends AbstractDevConsole {
var stat = findStats(stats, e.getEndpointUri());
if (stat.isPresent()) {
var st = stat.get();
- sb.append(String.format("\n %s (direction: %s, usage:
%s)", uri, st.getDirection(), st.getHits()));
+ sb.append(String.format("\n %s (remote: %s direction:
%s, usage: %s)", uri, remote, st.getDirection(),
+ st.getHits()));
} else {
- sb.append(String.format("\n %s", uri));
+ sb.append(String.format("\n %s (remote: %s)", uri,
remote));
}
}
}
diff --git
a/core/camel-console/src/main/java/org/apache/camel/impl/console/InflightConsole.java
b/core/camel-console/src/main/java/org/apache/camel/impl/console/InflightConsole.java
index 3ad4d5f2583..ad7f6f31b47 100644
---
a/core/camel-console/src/main/java/org/apache/camel/impl/console/InflightConsole.java
+++
b/core/camel-console/src/main/java/org/apache/camel/impl/console/InflightConsole.java
@@ -57,8 +57,9 @@ public class InflightConsole extends AbstractDevConsole {
if (repo.isInflightBrowseEnabled()) {
for (InflightRepository.InflightExchange ie : repo.browse(filter,
max, false)) {
String age = TimeUtils.printDuration(ie.getDuration(), true);
- sb.append(String.format("\n %s (from: %s at: %s/%s age:
%s)",
- ie.getExchange().getExchangeId(), ie.getFromRouteId(),
ie.getAtRouteId(), ie.getNodeId(), age));
+ sb.append(String.format("\n %s (from: %s at: %s/%s remote:
%b age: %s)",
+ ie.getExchange().getExchangeId(), ie.getFromRouteId(),
ie.getAtRouteId(), ie.getNodeId(),
+ ie.isFromRemoteEndpoint(), age));
}
}
@@ -82,6 +83,7 @@ public class InflightConsole extends AbstractDevConsole {
JsonObject props = new JsonObject();
props.put("exchangeId", ie.getExchange().getExchangeId());
props.put("fromRouteId", ie.getFromRouteId());
+ props.put("fromRemoteEndpoint", ie.isFromRemoteEndpoint());
props.put("atRouteId", ie.getAtRouteId());
props.put("nodeId", ie.getNodeId());
props.put("elapsed", ie.getElapsed());
diff --git
a/core/camel-console/src/main/java/org/apache/camel/impl/console/RouteDevConsole.java
b/core/camel-console/src/main/java/org/apache/camel/impl/console/RouteDevConsole.java
index 27ad2f51fe4..59d022b5440 100644
---
a/core/camel-console/src/main/java/org/apache/camel/impl/console/RouteDevConsole.java
+++
b/core/camel-console/src/main/java/org/apache/camel/impl/console/RouteDevConsole.java
@@ -87,6 +87,7 @@ public class RouteDevConsole extends AbstractDevConsole {
sb.append(String.format(" Node Prefix Id: %s",
mrb.getNodePrefixId()));
}
sb.append(String.format("\n From: %s", mrb.getEndpointUri()));
+ sb.append(String.format("\n Remote: %s",
mrb.isRemoteEndpoint()));
if (mrb.getSourceLocation() != null) {
sb.append(String.format("\n Source: %s",
mrb.getSourceLocation()));
}
@@ -233,6 +234,7 @@ public class RouteDevConsole extends AbstractDevConsole {
jo.put("nodePrefixId", mrb.getNodePrefixId());
}
jo.put("from", mrb.getEndpointUri());
+ jo.put("remote", mrb.isRemoteEndpoint());
if (mrb.getSourceLocation() != null) {
jo.put("source", mrb.getSourceLocation());
}
diff --git
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java
index 2e94ff19b99..a2b8485f749 100644
---
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java
+++
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java
@@ -129,6 +129,18 @@ public interface ManagedCamelContextMBean extends
ManagedPerformanceCounterMBean
@ManagedAttribute(description = "Throughput message/second")
String getThroughput();
+ @ManagedAttribute(description = "Total number of exchanges processed from
remote endpoints only")
+ long getRemoteExchangesTotal();
+
+ @ManagedAttribute(description = "Completed (success) number of exchanges
processed from remote endpoints only")
+ long getRemoteExchangesCompleted();
+
+ @ManagedAttribute(description = "Failed number of exchanges processed from
remote endpoints only")
+ long getRemoteExchangesFailed();
+
+ @ManagedAttribute(description = "Total number of exchanges inflight from
remote endpoints only")
+ long getRemoteExchangesInflight();
+
@ManagedAttribute(description = "Whether breadcrumbs is in use")
boolean isUseBreadcrumb();
diff --git
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedConsumerMBean.java
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedConsumerMBean.java
index 96404ade564..1cbb9fdace7 100644
---
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedConsumerMBean.java
+++
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedConsumerMBean.java
@@ -29,4 +29,7 @@ public interface ManagedConsumerMBean extends
ManagedServiceMBean {
@ManagedAttribute(description = "Whether this consumer hosts a service
such as acting as a HTTP server (only available for some components)")
boolean isHostedService();
+ @ManagedAttribute(description = "Whether this consumer connects to remote
or local systems")
+ boolean isRemoteEndpoint();
+
}
diff --git
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedEndpointMBean.java
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedEndpointMBean.java
index dcf0a15337c..a71b4773bde 100644
---
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedEndpointMBean.java
+++
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedEndpointMBean.java
@@ -37,7 +37,7 @@ public interface ManagedEndpointMBean {
@ManagedAttribute(description = "Singleton")
boolean isSingleton();
- @ManagedAttribute(description = "Remote")
+ @ManagedAttribute(description = "Whether this endpoint connects to remote
or local systems")
boolean isRemote();
@ManagedAttribute(description = "Endpoint State")
diff --git
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedProducerMBean.java
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedProducerMBean.java
index 4f40b4b00c0..11f34f096e4 100644
---
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedProducerMBean.java
+++
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedProducerMBean.java
@@ -26,4 +26,7 @@ public interface ManagedProducerMBean extends
ManagedServiceMBean {
@ManagedAttribute(description = "Singleton")
boolean isSingleton();
+ @ManagedAttribute(description = "Whether this producer connects to remote
or local systems")
+ boolean isRemoteEndpoint();
+
}
diff --git
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
index e22e393b6a5..514230d8e83 100644
---
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
+++
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
@@ -178,4 +178,7 @@ public interface ManagedRouteMBean extends
ManagedPerformanceCounterMBean {
@ManagedAttribute(description = "Whether update route from XML is enabled")
boolean isUpdateRouteEnabled();
+ @ManagedAttribute(description = "Whether the consumer connects to remote
or local systems")
+ boolean isRemoteEndpoint();
+
}
diff --git
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
index 78fb78127b5..a7e838b3fe9 100644
---
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
+++
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
@@ -64,6 +64,10 @@ public class ManagedCamelContext extends
ManagedPerformanceCounter implements Ti
private final String jmxDomain;
private final boolean includeRouteTemplates;
private final boolean includeKamelets;
+ private Statistic remoteExchangesTotal;
+ private Statistic remoteExchangesCompleted;
+ private Statistic remoteExchangesFailed;
+ private Statistic remoteExchangesInflight;
public ManagedCamelContext(CamelContext context) {
this.context = context;
@@ -75,11 +79,24 @@ public class ManagedCamelContext extends
ManagedPerformanceCounter implements Ti
@Override
public void init(ManagementStrategy strategy) {
super.init(strategy);
+ this.remoteExchangesTotal = new StatisticCounter();
+ this.remoteExchangesCompleted = new StatisticCounter();
+ this.remoteExchangesFailed = new StatisticCounter();
+ this.remoteExchangesInflight = new StatisticCounter();
boolean enabled = context.getManagementStrategy().getManagementAgent()
!= null
&&
context.getManagementStrategy().getManagementAgent().getStatisticsLevel() !=
ManagementStatisticsLevel.Off;
setStatisticsEnabled(enabled);
}
+ @Override
+ public void reset() {
+ super.reset();
+ remoteExchangesTotal.reset();
+ remoteExchangesCompleted.reset();
+ remoteExchangesFailed.reset();
+ remoteExchangesInflight.reset();
+ }
+
@Override
public void completedExchange(Exchange exchange, long time) {
// the camel-context mbean is triggered for every route mbean
@@ -91,9 +108,19 @@ public class ManagedCamelContext extends
ManagedPerformanceCounter implements Ti
int level = uow.routeStackLevel(includeRouteTemplates,
includeKamelets);
if (level <= 1) {
super.completedExchange(exchange, time);
+ if (exchange.getFromEndpoint() != null &&
exchange.getFromEndpoint().isRemote()) {
+ remoteExchangesTotal.increment();
+ remoteExchangesCompleted.increment();
+ remoteExchangesInflight.decrement();
+ }
}
} else {
super.completedExchange(exchange, time);
+ if (exchange.getFromEndpoint() != null &&
exchange.getFromEndpoint().isRemote()) {
+ remoteExchangesTotal.increment();
+ remoteExchangesCompleted.increment();
+ remoteExchangesInflight.decrement();
+ }
}
}
@@ -108,9 +135,19 @@ public class ManagedCamelContext extends
ManagedPerformanceCounter implements Ti
int level = uow.routeStackLevel(includeRouteTemplates,
includeKamelets);
if (level <= 1) {
super.failedExchange(exchange);
+ if (exchange.getFromEndpoint() != null &&
exchange.getFromEndpoint().isRemote()) {
+ remoteExchangesTotal.increment();
+ remoteExchangesFailed.increment();
+ remoteExchangesInflight.decrement();
+ }
}
} else {
super.failedExchange(exchange);
+ if (exchange.getFromEndpoint() != null &&
exchange.getFromEndpoint().isRemote()) {
+ remoteExchangesTotal.increment();
+ remoteExchangesFailed.increment();
+ remoteExchangesInflight.decrement();
+ }
}
}
@@ -125,9 +162,15 @@ public class ManagedCamelContext extends
ManagedPerformanceCounter implements Ti
int level = uow.routeStackLevel(includeRouteTemplates,
includeKamelets);
if (level <= 1) {
super.processExchange(exchange, type);
+ if (exchange.getFromEndpoint() != null &&
exchange.getFromEndpoint().isRemote()) {
+ remoteExchangesInflight.increment();
+ }
}
} else {
super.processExchange(exchange, type);
+ if (exchange.getFromEndpoint() != null &&
exchange.getFromEndpoint().isRemote()) {
+ remoteExchangesInflight.increment();
+ }
}
}
@@ -330,6 +373,26 @@ public class ManagedCamelContext extends
ManagedPerformanceCounter implements Ti
}
}
+ @Override
+ public long getRemoteExchangesTotal() {
+ return remoteExchangesTotal.getValue();
+ }
+
+ @Override
+ public long getRemoteExchangesCompleted() {
+ return remoteExchangesCompleted.getValue();
+ }
+
+ @Override
+ public long getRemoteExchangesFailed() {
+ return remoteExchangesFailed.getValue();
+ }
+
+ @Override
+ public long getRemoteExchangesInflight() {
+ return remoteExchangesInflight.getValue();
+ }
+
@Override
public boolean isUseBreadcrumb() {
return context.isUseBreadcrumb();
diff --git
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java
index 5766b4c4e25..f4ea8da0545 100644
---
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java
+++
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java
@@ -56,4 +56,9 @@ public class ManagedConsumer extends ManagedService
implements ManagedConsumerMB
}
return false;
}
+
+ @Override
+ public boolean isRemoteEndpoint() {
+ return consumer.getEndpoint().isRemote();
+ }
}
diff --git
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedProducer.java
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedProducer.java
index 1d08d966ff7..94ce8c0d929 100644
---
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedProducer.java
+++
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedProducer.java
@@ -44,4 +44,8 @@ public class ManagedProducer extends ManagedService
implements ManagedProducerMB
return producer.isSingleton();
}
+ @Override
+ public boolean isRemoteEndpoint() {
+ return producer.getEndpoint().isRemote();
+ }
}
diff --git
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
index 151ae524b0f..ecbed4766b1 100644
---
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
+++
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
@@ -733,6 +733,14 @@ public class ManagedRoute extends
ManagedPerformanceCounter implements TimerList
return enabled != null ? enabled : false;
}
+ @Override
+ public boolean isRemoteEndpoint() {
+ if (route.getEndpoint() != null) {
+ return route.getEndpoint().isRemote();
+ }
+ return false;
+ }
+
@Override
public boolean equals(Object o) {
return this == o || o != null && getClass() == o.getClass() &&
route.equals(((ManagedRoute) o).route);
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelContextStatus.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelContextStatus.java
index df7db610e44..c04c6fd1b28 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelContextStatus.java
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelContextStatus.java
@@ -90,8 +90,20 @@ public class CamelContextStatus extends ProcessWatchCommand {
row.throughput = thp.toString();
}
row.total = stats.get("exchangesTotal").toString();
- row.inflight =
stats.get("exchangesInflight").toString();
+ Object num = stats.get("remoteExchangesTotal");
+ if (num != null) {
+ row.totalRemote = num.toString();
+ }
row.failed =
stats.get("exchangesFailed").toString();
+ num = stats.get("remoteExchangesFailed");
+ if (num != null) {
+ row.failedRemote = num.toString();
+ }
+ row.inflight =
stats.get("exchangesInflight").toString();
+ num = stats.get("remoteExchangesInflight");
+ if (num != null) {
+ row.inflightRemote = num.toString();
+ }
row.reloaded = stats.get("reloaded").toString();
Object last = stats.get("lastProcessingTime");
if (last != null) {
@@ -157,9 +169,9 @@ public class CamelContextStatus extends ProcessWatchCommand
{
new
Column().header("AGE").headerAlign(HorizontalAlign.CENTER).with(r -> r.age),
new Column().header("ROUTE").with(this::getRoutes),
new Column().header("MSG/S").with(this::getThroughput),
- new Column().header("TOTAL").with(r -> r.total),
- new Column().header("FAIL").with(r -> r.failed),
- new Column().header("INFLIGHT").with(r -> r.inflight),
+ new Column().header("TOTAL").with(this::getTotal),
+ new Column().header("FAIL").with(this::getFailed),
+ new Column().header("INFLIGHT").with(this::getInflight),
new Column().header("LAST").with(r -> r.last),
new Column().header("DELTA").with(this::getDelta),
new
Column().header("SINCE-LAST").with(this::getSinceLast))));
@@ -208,6 +220,27 @@ public class CamelContextStatus extends
ProcessWatchCommand {
}
}
+ private String getTotal(Row r) {
+ if (r.totalRemote != null) {
+ return r.totalRemote + "/" + r.total;
+ }
+ return r.total;
+ }
+
+ private String getFailed(Row r) {
+ if (r.failedRemote != null) {
+ return r.failedRemote + "/" + r.failed;
+ }
+ return r.failed;
+ }
+
+ private String getInflight(Row r) {
+ if (r.inflightRemote != null) {
+ return r.inflightRemote + "/" + r.inflight;
+ }
+ return r.inflight;
+ }
+
private String getPlatform(Row r) {
if (r.platformVersion != null) {
return r.platform + " v" + r.platformVersion;
@@ -271,8 +304,11 @@ public class CamelContextStatus extends
ProcessWatchCommand {
long uptime;
String throughput;
String total;
+ String totalRemote;
String failed;
+ String failedRemote;
String inflight;
+ String inflightRemote;
String last;
String delta;
String sinceLastStarted;
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteStatus.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteStatus.java
index 89939bedc35..ac3eb7464f4 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteStatus.java
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteStatus.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import com.github.freva.asciitable.AsciiTable;
import com.github.freva.asciitable.Column;
@@ -73,6 +74,7 @@ public class CamelRouteStatus extends ProcessWatchCommand {
public Integer doProcessWatchCall() throws Exception {
List<Row> rows = new ArrayList<>();
+ AtomicBoolean remoteVisible = new AtomicBoolean();
List<Long> pids = findPids(name);
ProcessHandle.allProcesses()
.filter(ph -> pids.contains(ph.pid()))
@@ -94,6 +96,12 @@ public class CamelRouteStatus extends ProcessWatchCommand {
row.pid = Long.toString(ph.pid());
row.routeId = o.getString("routeId");
row.from = o.getString("from");
+ Boolean bool = o.getBoolean("remote");
+ if (bool != null) {
+ // older camel versions does not include this
information
+ remoteVisible.set(true);
+ row.remote = bool;
+ }
row.source = o.getString("source");
row.state = o.getString("state");
row.age = o.getString("uptime");
@@ -172,13 +180,13 @@ public class CamelRouteStatus extends ProcessWatchCommand
{
rows.sort(this::sortRow);
if (!rows.isEmpty()) {
- printTable(rows);
+ printTable(rows, remoteVisible.get());
}
return 0;
}
- protected void printTable(List<Row> rows) {
+ protected void printTable(List<Row> rows, boolean remoteVisible) {
printer().println(AsciiTable.getTable(AsciiTable.NO_BORDERS, rows,
Arrays.asList(
new
Column().header("PID").headerAlign(HorizontalAlign.CENTER).with(r -> r.pid),
new
Column().header("NAME").dataAlign(HorizontalAlign.LEFT).maxWidth(30,
OverflowBehaviour.ELLIPSIS_RIGHT)
@@ -188,9 +196,9 @@ public class CamelRouteStatus extends ProcessWatchCommand {
new
Column().header("FROM").visible(!wideUri).dataAlign(HorizontalAlign.LEFT)
.maxWidth(45, OverflowBehaviour.ELLIPSIS_RIGHT)
.with(this::getFrom),
- new
Column().header("FROM").visible(wideUri).dataAlign(HorizontalAlign.LEFT)
- .maxWidth(45, OverflowBehaviour.NEWLINE)
- .with(this::getFrom),
+ new
Column().header("REMOTE").visible(remoteVisible).headerAlign(HorizontalAlign.CENTER)
+ .dataAlign(HorizontalAlign.CENTER)
+ .with(this::getRemote),
new
Column().header("STATUS").headerAlign(HorizontalAlign.CENTER)
.with(r -> r.state),
new
Column().header("AGE").headerAlign(HorizontalAlign.CENTER).with(r -> r.age),
@@ -260,6 +268,10 @@ public class CamelRouteStatus extends ProcessWatchCommand {
return s;
}
+ protected String getRemote(Row r) {
+ return r.remote ? "x" : "";
+ }
+
protected String getId(Row r) {
if (source && r.source != null) {
return sourceLocLine(r.source);
@@ -286,6 +298,7 @@ public class CamelRouteStatus extends ProcessWatchCommand {
long uptime;
String routeId;
String from;
+ boolean remote;
String source;
String state;
String age;
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteTop.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteTop.java
index 31cfc6a3a56..75d65f92a70 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteTop.java
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteTop.java
@@ -35,7 +35,7 @@ public class CamelRouteTop extends CamelRouteStatus {
}
@Override
- protected void printTable(List<Row> rows) {
+ protected void printTable(List<Row> rows, boolean remoteVisible) {
printer().println(AsciiTable.getTable(AsciiTable.NO_BORDERS, rows,
Arrays.asList(
new
Column().header("PID").headerAlign(HorizontalAlign.CENTER).with(r -> r.pid),
new
Column().header("NAME").dataAlign(HorizontalAlign.LEFT).maxWidth(30,
OverflowBehaviour.ELLIPSIS_RIGHT)
@@ -44,6 +44,9 @@ public class CamelRouteTop extends CamelRouteStatus {
.with(this::getId),
new
Column().header("FROM").dataAlign(HorizontalAlign.LEFT).maxWidth(40,
OverflowBehaviour.ELLIPSIS_RIGHT)
.with(this::getFrom),
+ new
Column().header("REMOTE").visible(remoteVisible).headerAlign(HorizontalAlign.CENTER)
+ .dataAlign(HorizontalAlign.CENTER)
+ .with(this::getRemote),
new
Column().header("STATUS").headerAlign(HorizontalAlign.CENTER)
.with(r -> r.state),
new
Column().header("AGE").headerAlign(HorizontalAlign.CENTER).with(r -> r.age),
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListInflight.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListInflight.java
index b65dd342f48..e0c39a4ccfb 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListInflight.java
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListInflight.java
@@ -19,6 +19,7 @@ package org.apache.camel.dsl.jbang.core.commands.process;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import com.github.freva.asciitable.AsciiTable;
import com.github.freva.asciitable.Column;
@@ -52,6 +53,7 @@ public class ListInflight extends ProcessWatchCommand {
public Integer doProcessWatchCall() throws Exception {
List<Row> rows = new ArrayList<>();
+ AtomicBoolean remoteVisible = new AtomicBoolean();
List<Long> pids = findPids(name);
ProcessHandle.allProcesses()
.filter(ph -> pids.contains(ph.pid()))
@@ -81,6 +83,12 @@ public class ListInflight extends ProcessWatchCommand {
jo = (JsonObject) arr.get(i);
row.exchangeId =
jo.getString("exchangeId");
row.fromRouteId =
jo.getString("fromRouteId");
+ Boolean bool =
jo.getBoolean("fromRemoteEndpoint");
+ if (bool != null) {
+ // older camel versions does not
include this information
+ remoteVisible.set(true);
+ row.fromRemoteEndpoint = bool;
+ }
row.atRouteId = jo.getString("atRouteId");
row.nodeId = jo.getString("nodeId");
row.elapsed = jo.getLong("elapsed");
@@ -101,6 +109,8 @@ public class ListInflight extends ProcessWatchCommand {
new
Column().header("NAME").dataAlign(HorizontalAlign.LEFT).maxWidth(30,
OverflowBehaviour.ELLIPSIS_RIGHT)
.with(r -> r.name),
new
Column().header("EXCHANGE-ID").dataAlign(HorizontalAlign.LEFT).with(r ->
r.exchangeId),
+ new
Column().header("REMOTE").visible(remoteVisible.get()).dataAlign(HorizontalAlign.CENTER)
+ .with(this::getRemote),
new
Column().header("ROUTE").dataAlign(HorizontalAlign.LEFT).maxWidth(25,
OverflowBehaviour.ELLIPSIS_RIGHT)
.with(r -> r.atRouteId),
new
Column().header("ID").dataAlign(HorizontalAlign.LEFT).maxWidth(25,
OverflowBehaviour.ELLIPSIS_RIGHT)
@@ -139,6 +149,10 @@ public class ListInflight extends ProcessWatchCommand {
return TimeUtils.printDuration(r.elapsed);
}
+ private String getRemote(Row r) {
+ return r.fromRemoteEndpoint ? "x" : "";
+ }
+
private static class Row implements Cloneable {
String pid;
String name;
@@ -146,6 +160,7 @@ public class ListInflight extends ProcessWatchCommand {
long uptime;
String exchangeId;
String fromRouteId;
+ boolean fromRemoteEndpoint;
String atRouteId;
String nodeId;
long elapsed;
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListProcess.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListProcess.java
index dd9c2213b90..a3dec68a651 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListProcess.java
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListProcess.java
@@ -82,8 +82,20 @@ public class ListProcess extends ProcessWatchCommand {
Map<String, ?> stats = context.getMap("statistics");
if (stats != null) {
row.total = stats.get("exchangesTotal").toString();
- row.inflight =
stats.get("exchangesInflight").toString();
+ Object num = stats.get("remoteExchangesTotal");
+ if (num != null) {
+ row.totalRemote = num.toString();
+ }
row.failed =
stats.get("exchangesFailed").toString();
+ num = stats.get("remoteExchangesFailed");
+ if (num != null) {
+ row.failedRemote = num.toString();
+ }
+ row.inflight =
stats.get("exchangesInflight").toString();
+ num = stats.get("remoteExchangesInflight");
+ if (num != null) {
+ row.inflightRemote = num.toString();
+ }
}
rows.add(row);
}
@@ -105,15 +117,36 @@ public class ListProcess extends ProcessWatchCommand {
new
Column().header("STATUS").headerAlign(HorizontalAlign.CENTER)
.with(r -> extractState(r.state)),
new
Column().header("AGE").headerAlign(HorizontalAlign.CENTER).with(r -> r.ago),
- new Column().header("TOTAL").with(r -> r.total),
- new Column().header("FAIL").with(r -> r.failed),
- new Column().header("INFLIGHT").with(r ->
r.inflight))));
+ new Column().header("TOTAL").with(this::getTotal),
+ new Column().header("FAIL").with(this::getFailed),
+ new
Column().header("INFLIGHT").with(this::getInflight))));
}
}
return 0;
}
+ private String getTotal(Row r) {
+ if (r.totalRemote != null) {
+ return r.totalRemote + "/" + r.total;
+ }
+ return r.total;
+ }
+
+ private String getFailed(Row r) {
+ if (r.failedRemote != null) {
+ return r.failedRemote + "/" + r.failed;
+ }
+ return r.failed;
+ }
+
+ private String getInflight(Row r) {
+ if (r.inflightRemote != null) {
+ return r.inflightRemote + "/" + r.inflight;
+ }
+ return r.inflight;
+ }
+
protected int sortRow(Row o1, Row o2) {
String s = sort;
int negate = 1;
@@ -141,8 +174,11 @@ public class ListProcess extends ProcessWatchCommand {
String ago;
long uptime;
String total;
+ String totalRemote;
String failed;
+ String failedRemote;
String inflight;
+ String inflightRemote;
}
}