Author: cwiklik
Date: Tue Oct 16 16:30:51 2018
New Revision: 1844027

URL: http://svn.apache.org/viewvc?rev=1844027&view=rev
Log:
UIMA-5894 modified camel route to use JmsTemplate to publish JMS pings to test 
broker viability. The JmsTemplate creates new broker connection for every ping.

Modified:
    
uima/uima-ducc/trunk/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/config/WebServerConfiguration.java
    
uima/uima-ducc/trunk/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/self/message/WebServerStateProcessor.java

Modified: 
uima/uima-ducc/trunk/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/config/WebServerConfiguration.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/config/WebServerConfiguration.java?rev=1844027&r1=1844026&r2=1844027&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/config/WebServerConfiguration.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/config/WebServerConfiguration.java
 Tue Oct 16 16:30:51 2018
@@ -29,9 +29,12 @@ import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.jms.JmsMessage;
+import org.apache.uima.ducc.common.authentication.BrokerCredentials;
+import 
org.apache.uima.ducc.common.authentication.BrokerCredentials.Credentials;
 import org.apache.uima.ducc.common.config.CommonConfiguration;
 import org.apache.uima.ducc.common.config.DuccBlastGuardPredicate;
 import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.Utils;
 import org.apache.uima.ducc.common.utils.id.DuccId;
 import org.apache.uima.ducc.transport.DuccTransportConfiguration;
 import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
@@ -62,7 +65,9 @@ public class WebServerConfiguration {
        private DuccId jobid = null;
        
        private AtomicBoolean singleton = new AtomicBoolean(false);
+       private Credentials brokerCredentials;
        
+
        /**
         * Instantiate {@link WebServerEventListener} which will handle 
incoming messages.
         * 
@@ -109,13 +114,17 @@ public class WebServerConfiguration {
         */
        private RouteBuilder routeBuilderForWebServerStatePost(final String 
targetEndpointToReceiveWebServerStateUpdate, final int statePublishRate) throws 
Exception {
                final WebServerStateProcessor wssp =  // an object responsible 
for generating the state 
-                       new WebServerStateProcessor();
+                               new WebServerStateProcessor(common.brokerUrl,
+                                               
brokerCredentials.getUsername(), 
+                                               brokerCredentials.getPassword(),
+                                               
targetEndpointToReceiveWebServerStateUpdate
+                                               );
                
                return new RouteBuilder() {
                      public void configure() {                     
                        
                        final Predicate blastFilter = new 
DuccBlastGuardPredicate(duccLogger);
-                       
+                       onException(Exception.class).handled(true).process(new 
ErrorProcessor());
                        
from("timer:webserverStateDumpTimer?fixedRate=true&period=" + statePublishRate)
                              // This route uses a filter to prevent sudden 
bursts of messages which
                                  // may flood DUCC daemons causing chaos. The 
filter disposes any event
@@ -124,12 +133,19 @@ public class WebServerConfiguration {
                              //.process(xmStart)
                                  .process(wssp)
                                  //.process(xmEnded)
-                                 
.to(targetEndpointToReceiveWebServerStateUpdate)
+                                 
//.to(targetEndpointToReceiveWebServerStateUpdate)
                                  ;
                      }
                    };
        }
-       
+         public class ErrorProcessor implements Processor {
+
+                   public void process(Exchange exchange) throws Exception {
+                     // the caused by exception is stored in a property on the 
exchange
+                     Throwable causedBy = 
exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
+                     duccLogger.error("ErrorProcessor.process", jobid, 
causedBy);
+                   }
+                 }
        /**
         * Creates and initializes {@link WebServerComponent} instance. @Bean 
annotation identifies {@link WebServerComponent}
         * as a Spring framework Bean which will be managed by Spring 
container.  
@@ -143,6 +159,19 @@ public class WebServerConfiguration {
                String methodName = "webServer";
                WebServerComponent ws = null;
                try {
+                       String brokerCredentialsFile = 
+                                       
System.getProperty("ducc.broker.credentials.file");
+                       duccLogger.info("webServer", jobid, "Credentials 
File:"+brokerCredentialsFile);
+                   String path =
+                              
Utils.resolvePlaceholderIfExists(brokerCredentialsFile, System.getProperties());
+                       duccLogger.info("webServer", jobid, "Path:"+path);
+                   brokerCredentials = BrokerCredentials.get(path);
+                       
+               } catch( Throwable e) {
+                       duccLogger.error("webServer", jobid, e);
+                       
+               }
+               try {
                        if(singleton.getAndSet(true)) {
                                try {
                                        throw new RuntimeException("singleton 
already present!");

Modified: 
uima/uima-ducc/trunk/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/self/message/WebServerStateProcessor.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/self/message/WebServerStateProcessor.java?rev=1844027&r1=1844026&r2=1844027&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/self/message/WebServerStateProcessor.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/self/message/WebServerStateProcessor.java
 Tue Oct 16 16:30:51 2018
@@ -18,20 +18,75 @@
 */
 package org.apache.uima.ducc.ws.self.message;
 
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
 
 /**
  * Processor of received webserver messages to self via broker
  */
 public class WebServerStateProcessor implements Processor {
-       
+       private final String brokerUrl;
+       private final String username;
+       private final String password;
+       private final String endpoint;
+       private DuccLogger duccLogger = 
DuccLogger.getLogger(WebServerStateProcessor.class);
+       public WebServerStateProcessor(String brokerUrl, String username, 
String password, String endpoint) {
+               this.brokerUrl = brokerUrl;
+               this.username = username;
+               this.password = password;
+               this.endpoint = endpoint;
+       }
        @Override
        public void process(Exchange exchange) throws Exception {
-               WebServerStateDuccEvent wse = new WebServerStateDuccEvent();
-               WebServerState wss = new WebServerState();
-               wse.setState(wss);
-               exchange.getIn().setBody(wse);
+               try {
+                       ActiveMQConnectionFactory amqcf =
+                                       new ActiveMQConnectionFactory(username, 
password, brokerUrl);
+                       amqcf.setTrustAllPackages(true);
+                       // this dispatcher will create a new connection for 
every ping.
+                       // No cleanup is needed as this also closes session and 
connection
+                       // after each ping.
+                       JmsTemplate jmsDispatcher = 
+                                       new JmsTemplate(amqcf);
+                       // The endpoint is configured for Camel use and has the 
following
+                       // syntax: activemq:topic:<name>
+                       // Strip the activemq:topic: part to just get the name 
of the jms
+                       // topic which will be used to create ActiveMQTopic
+                       int pos = endpoint.indexOf("topic:")+"topic:".length();
+                       
+                       String topicName = endpoint.substring(pos);
+                       ActiveMQTopic topic = new ActiveMQTopic(topicName);
+                       jmsDispatcher.send(topic, new MessageCreator() {
+                               
+                               @Override
+                               public Message createMessage(Session session) 
throws JMSException {
+                                       // create a ping event message
+                                       ObjectMessage body = 
session.createObjectMessage();
+                                       WebServerStateDuccEvent wse = new 
WebServerStateDuccEvent();
+                                       WebServerState wss = new 
WebServerState();
+                                       wse.setState(wss);
+                                       body.setObject(wse);
+                                       return body;
+                               }
+                       
+                       });
+                       
+               } catch( Throwable t ) {
+                       duccLogger.error("process",null, t);
+               }
+//             WebServerStateDuccEvent wse = new WebServerStateDuccEvent();
+//             WebServerState wss = new WebServerState();
+//             wse.setState(wss);
+//             exchange.getIn().setBody(wse);
        }
 
 }


Reply via email to