CAMEL-8223: Inflight repository to allow browsing of current inflight exchanges


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/996e32c9
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/996e32c9
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/996e32c9

Branch: refs/heads/master
Commit: 996e32c9962faf5d123dd0662f09f914806f8ecd
Parents: 3b94318
Author: Claus Ibsen <davscl...@apache.org>
Authored: Fri Jan 9 10:18:33 2015 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Jan 9 13:41:46 2015 +0100

----------------------------------------------------------------------
 .../management/mbean/CamelOpenMBeanTypes.java   | 11 +++
 .../mbean/ManagedInflightRepositoryMBean.java   | 35 ++++++++
 .../DefaultManagementLifecycleStrategy.java     |  4 +
 .../mbean/ManagedInflightRepository.java        | 84 ++++++++++++++++++++
 .../ManagedInflightRepositoryTest.java          | 75 +++++++++++++++++
 5 files changed, 209 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/996e32c9/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
index 5d1fa41..2654a6c 100644
--- 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
+++ 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
@@ -121,4 +121,15 @@ public final class CamelOpenMBeanTypes {
                 new OpenType[]{SimpleType.STRING, SimpleType.STRING, 
SimpleType.STRING, SimpleType.STRING, SimpleType.STRING});
     }
 
+    public static TabularType listInflightExchangesTabularType() throws 
OpenDataException {
+        CompositeType ct = listInflightExchangesCompositeType();
+        return new TabularType("listInflightExchanges", "Lists inflight 
exchanges", ct, new String[]{"exchangeId"});
+    }
+
+    public static CompositeType listInflightExchangesCompositeType() throws 
OpenDataException {
+        return new CompositeType("exchanges", "Exchanges", new 
String[]{"exchangeId", "routeId", "nodeId", "duration", "elapsed"},
+                new String[]{"Exchange Id", "RouteId", "NodeId", "Duration", 
"Elapsed"},
+                new OpenType[]{SimpleType.STRING, SimpleType.STRING, 
SimpleType.STRING, SimpleType.STRING, SimpleType.STRING});
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/996e32c9/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java
 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java
new file mode 100644
index 0000000..1b42758
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.api.management.mbean;
+
+import javax.management.openmbean.TabularData;
+
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedOperation;
+
+public interface ManagedInflightRepositoryMBean extends ManagedServiceMBean {
+
+    @ManagedAttribute(description = "Current size of inflight exchanges.")
+    int getSize();
+
+    @ManagedOperation(description = "Current size of inflight exchanges which 
are from the given route.")
+    int size(String routeId);
+
+    @ManagedOperation(description = "Lists all the exchanges which are 
currently inflight")
+    TabularData browse();
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/996e32c9/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
 
b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
index 6291545..03d7ed1 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
@@ -58,6 +58,7 @@ import org.apache.camel.management.mbean.ManagedCamelContext;
 import org.apache.camel.management.mbean.ManagedConsumerCache;
 import org.apache.camel.management.mbean.ManagedEndpoint;
 import org.apache.camel.management.mbean.ManagedEndpointRegistry;
+import org.apache.camel.management.mbean.ManagedInflightRepository;
 import org.apache.camel.management.mbean.ManagedProducerCache;
 import org.apache.camel.management.mbean.ManagedRestRegistry;
 import org.apache.camel.management.mbean.ManagedRoute;
@@ -81,6 +82,7 @@ import org.apache.camel.processor.interceptor.BacklogTracer;
 import org.apache.camel.processor.interceptor.Tracer;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.EventNotifier;
+import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.spi.LifecycleStrategy;
 import org.apache.camel.spi.ManagementAgent;
 import org.apache.camel.spi.ManagementAware;
@@ -468,6 +470,8 @@ public class DefaultManagementLifecycleStrategy extends 
ServiceSupport implement
             answer = new ManagedTypeConverterRegistry(context, 
(TypeConverterRegistry) service);
         } else if (service instanceof RestRegistry) {
             answer = new ManagedRestRegistry(context, (RestRegistry) service);
+        } else if (service instanceof InflightRepository) {
+            answer = new ManagedInflightRepository(context, 
(InflightRepository) service);
         } else if (service instanceof AsyncProcessorAwaitManager) {
             answer = new ManagedAsyncProcessorAwaitManager(context, 
(AsyncProcessorAwaitManager) service);
         } else if (service instanceof RuntimeEndpointRegistry) {

http://git-wip-us.apache.org/repos/asf/camel/blob/996e32c9/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java
 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java
new file mode 100644
index 0000000..1ab9421
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management.mbean;
+
+import java.util.Collection;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes;
+import org.apache.camel.api.management.mbean.ManagedInflightRepositoryMBean;
+import org.apache.camel.spi.InflightRepository;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ *
+ */
+@ManagedResource(description = "Managed InflightRepository")
+public class ManagedInflightRepository extends ManagedService implements 
ManagedInflightRepositoryMBean {
+
+    private final InflightRepository inflightRepository;
+
+    public ManagedInflightRepository(CamelContext context, InflightRepository 
inflightRepository) {
+        super(context, inflightRepository);
+        this.inflightRepository = inflightRepository;
+    }
+
+    public InflightRepository getInflightRepository() {
+        return inflightRepository;
+    }
+
+    @Override
+    public int getSize() {
+        return inflightRepository.size();
+    }
+
+    @Override
+    public int size(String routeId) {
+        return inflightRepository.size(routeId);
+    }
+
+    @Override
+    public TabularData browse() {
+        try {
+            TabularData answer = new 
TabularDataSupport(CamelOpenMBeanTypes.listInflightExchangesTabularType());
+            Collection<InflightRepository.InflightExchange> exchanges = 
inflightRepository.browse();
+            for (InflightRepository.InflightExchange entry : exchanges) {
+                CompositeType ct = 
CamelOpenMBeanTypes.listInflightExchangesCompositeType();
+                String exchangeId = entry.getExchange().getExchangeId();
+                String routeId = entry.getRouteId();
+                String nodeId = entry.getNodeId();
+                String duration = "" + entry.getDuration();
+                String elapsed = "" + entry.getElapsed();
+
+                CompositeData data = new CompositeDataSupport(ct,
+                        new String[]{"exchangeId", "routeId", "nodeId", 
"duration", "elapsed"},
+                        new Object[]{exchangeId, routeId, nodeId, duration, 
elapsed});
+                answer.put(data);
+            }
+            return answer;
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/996e32c9/camel-core/src/test/java/org/apache/camel/management/ManagedInflightRepositoryTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/management/ManagedInflightRepositoryTest.java
 
b/camel-core/src/test/java/org/apache/camel/management/ManagedInflightRepositoryTest.java
new file mode 100644
index 0000000..bb0a7e7
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/management/ManagedInflightRepositoryTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.openmbean.TabularData;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version 
+ */
+public class ManagedInflightRepositoryTest extends ManagementTestSupport {
+
+    public void testInflightRepository() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").routeId("foo")
+                        .to("mock:a")
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws 
Exception {
+                                MBeanServer mbeanServer = getMBeanServer();
+                                ObjectName name = 
ObjectName.getInstance("org.apache.camel:context=camel-1,type=services,name=DefaultInflightRepository");
+
+                                Integer size = (Integer) 
mbeanServer.getAttribute(name, "Size");
+                                assertEquals(1, size.intValue());
+
+                                Integer routeSize = (Integer) 
mbeanServer.invoke(name, "size", new Object[]{"foo"}, new 
String[]{"java.lang.String"});
+                                assertEquals(1, routeSize.intValue());
+
+                                TabularData data = (TabularData) 
mbeanServer.invoke(name, "browse", null, null);
+                                assertNotNull(data);
+
+                                assertEquals(1, data.size());
+                            }
+                        }).id("myProcessor")
+                        .to("mock:result");
+            }
+        };
+    }
+
+}

Reply via email to