Author: letourneau Date: Tue Feb 5 21:18:41 2013 New Revision: 1442747 URL: http://svn.apache.org/viewvc?rev=1442747&view=rev Log: added publisher setup with JSON post
Modified: incubator/streams/trunk/streams-eip-routes/pom.xml incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityPublisherRegistrationProcessor.java incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityConsumerRouter.java incubator/streams/trunk/streams-osgi-components/activity-consumer/pom.xml incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/ActivityConsumer.java incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/ActivityConsumerWarehouseImpl.java incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/PushActivityConsumer.java incubator/streams/trunk/streams-osgi-components/activity-registration/pom.xml incubator/streams/trunk/streams-osgi-components/activity-registration/src/main/java/org/apache/streams/osgi/components/impl/ActivityPublisherRegistrationImpl.java Modified: incubator/streams/trunk/streams-eip-routes/pom.xml URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-eip-routes/pom.xml?rev=1442747&r1=1442746&r2=1442747&view=diff ============================================================================== --- incubator/streams/trunk/streams-eip-routes/pom.xml (original) +++ incubator/streams/trunk/streams-eip-routes/pom.xml Tue Feb 5 21:18:41 2013 @@ -72,7 +72,7 @@ <Bundle-Version>${pom.version}</Bundle-Version> <Export-Package>${bundle.namespace};version="${pom.version}",org.apache.streams.messaging.configuration,org.apache.streams.messaging.routers,org.apache.streams.messaging.rules,org.apache.streams.messaging.processors,org.apache.streams.messaging.aggregation,org.apache.activemq,org.codehaus.jackson.*;version="${jackson.version}"</Export-Package> <Private-Package>${bundle.namespace}.messaging.routers.impl.*,${bundle.namespace}.messaging.rules.impl.*</Private-Package> - <Import-Package>org.apache.camel.*;version="2.8.5",org.apache.streams.messaging.configuration,org.apache.activemq.camel.component,org.apache.activemq,org.apache.activemq.pool,org.apache.camel.component.jms,org.springframework.*;version="3.0.6.RELEASE",org.apache.commons.logging,org.apache.streams.*,org.apache.streams.osgi.components,org.apache.streams.osgi.components.activityconsumer,org.apache.streams.osgi.components.activitysubscriber,org.apache.streams.osgi.components.activitysubscriber.impl,org.apache.streams.messaging.processors,org.apache.streams.messaging.aggregation,javax.jms, javax.net.ssl, javax.transaction.xa, org.apache.activemq.advisory, org.apache.activemq.blob, org.apache.activemq.broker, org.apache.activemq.broker.region, org.apache.activemq.command, org.apache.activemq.filter, org.apache.activemq.jndi, org.apache.activemq.management, org.apache.activemq.selector, org.apache.activemq.state, org.apache.activemq.thread, org.apache.active mq.transaction, org.apache.activemq.transport, org.apache.activemq.transport.failover, org.apache.activemq.transport.tcp, org.apache.activemq.usage, org.apache.activemq.util, org.slf4j,org.codehaus.jackson;version="${jackson.version}",javax.xml.datatype, javax.xml.namespace, javax.xml.parsers, org.joda.time, org.joda.time.format, org.w3c.dom, org.w3c.dom.bootstrap, org.w3c.dom.ls, org.xml.sax</Import-Package> + <Import-Package>org.apache.camel.*;version="2.8.5",org.apache.streams.messaging.configuration,org.apache.activemq.camel.component,org.apache.activemq,org.apache.activemq.pool,org.apache.camel.component.jms,org.springframework.*;version="3.0.6.RELEASE",org.apache.commons.logging,org.apache.streams.*,org.apache.streams.osgi.components,org.apache.streams.osgi.components.activityconsumer,org.apache.streams.osgi.components.activityconsumer.impl,org.apache.streams.osgi.components.activitysubscriber,org.apache.streams.osgi.components.activitysubscriber.impl,org.apache.streams.messaging.processors,org.apache.streams.messaging.aggregation,javax.jms, javax.net.ssl, javax.transaction.xa, org.apache.activemq.advisory, org.apache.activemq.blob, org.apache.activemq.broker, org.apache.activemq.broker.region, org.apache.activemq.command, org.apache.activemq.filter, org.apache.activemq.jndi, org.apache.activemq.management, org.apache.activemq.selector, org.apache.acti vemq.state, org.apache.activemq.thread, org.apache.activemq.transaction, org.apache.activemq.transport, org.apache.activemq.transport.failover, org.apache.activemq.transport.tcp, org.apache.activemq.usage, org.apache.activemq.util, org.slf4j,org.codehaus.jackson;version="${jackson.version}",javax.xml.datatype, javax.xml.namespace, javax.xml.parsers, org.joda.time, org.joda.time.format, org.w3c.dom, org.w3c.dom.bootstrap, org.w3c.dom.ls, org.xml.sax</Import-Package> <Include-Resource>src/main/resources</Include-Resource> </instructions> </configuration> Modified: incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityPublisherRegistrationProcessor.java URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityPublisherRegistrationProcessor.java?rev=1442747&r1=1442746&r2=1442747&view=diff ============================================================================== --- incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityPublisherRegistrationProcessor.java (original) +++ incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityPublisherRegistrationProcessor.java Tue Feb 5 21:18:41 2013 @@ -2,14 +2,16 @@ package org.apache.streams.messaging.pro import org.apache.camel.Exchange; import org.apache.camel.Processor; - -import java.net.MalformedURLException; -import java.net.URI; -import java.net.URISyntaxException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.streams.osgi.components.activityconsumer.ActivityConsumer; +import org.apache.streams.osgi.components.activityconsumer.impl.PushActivityConsumer; +import org.codehaus.jackson.map.DeserializationConfig; +import org.codehaus.jackson.map.ObjectMapper; public class ActivityPublisherRegistrationProcessor implements Processor{ - + private static final transient Log LOG = LogFactory.getLog(ActivityStreamsSubscriberRegistrationProcessor.class); public void process(Exchange exchange){ //add the necessary headers to the message so that the activity registration component //can do a lookup to either make a new processor and endpoint, or pass the message to the right one @@ -25,14 +27,25 @@ public class ActivityPublisherRegistrati // authentication, all that good stuff...happens in the registration module String body = exchange.getIn().getBody(String.class); - try{ - URI publisherUrl = new URI(body); - exchange.getOut().setHeader("activityPublisherUri",body); - exchange.getOut().setBody(body); - }catch(URISyntaxException e){ + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES,false); + + try { + + // read from file, convert it to user class + ActivityConsumer configuration = mapper.readValue(body, ActivityConsumer.class); + if (configuration.getSrc()==null){ + LOG.info("configuration src is null"); + throw new Exception(); + } + + exchange.getOut().setBody(configuration); + + } catch (Exception e) { + LOG.info("error: " + e); exchange.getOut().setFault(true); exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,400); - exchange.getOut().setBody("POST should only contain a valid URI that is registering as an Activity Publisher."); + exchange.getOut().setBody("POST should contain a valid JSON configuration for registering as an Activity Publisher (check that src element is valid)."); } } Modified: incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityConsumerRouter.java URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityConsumerRouter.java?rev=1442747&r1=1442746&r2=1442747&view=diff ============================================================================== --- incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityConsumerRouter.java (original) +++ incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityConsumerRouter.java Tue Feb 5 21:18:41 2013 @@ -39,7 +39,7 @@ public class ActivityConsumerRouter exte //todo: add some better scheme then getCount for URL... //todo: make the route again if consumer exists...and context doesn't have route if (activityConsumer.isAuthenticated()){ - ActivityConsumer existingConsumer = activityConsumerWarehouse.findConsumerBySrc(activityConsumer.getSrc()); + ActivityConsumer existingConsumer = activityConsumerWarehouse.findConsumerBySrc(activityConsumer.getSrc().toASCIIString()); if (existingConsumer==null){ Modified: incubator/streams/trunk/streams-osgi-components/activity-consumer/pom.xml URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-consumer/pom.xml?rev=1442747&r1=1442746&r2=1442747&view=diff ============================================================================== --- incubator/streams/trunk/streams-osgi-components/activity-consumer/pom.xml (original) +++ incubator/streams/trunk/streams-osgi-components/activity-consumer/pom.xml Tue Feb 5 21:18:41 2013 @@ -7,6 +7,7 @@ <bundle.symbolicName>activity-consumer-bundle</bundle.symbolicName> <bundle.namespace>org.apache.streams.osgi.components.activityconsumer</bundle.namespace> <commons.log>1.1</commons.log> + <jackson.version>1.9.11</jackson.version> </properties> <modelVersion>4.0.0</modelVersion> @@ -70,7 +71,7 @@ <Bundle-Version>${pom.version}</Bundle-Version> <Export-Package>${bundle.namespace};version="${pom.version}",org.apache.streams.osgi.components.activityconsumer.impl</Export-Package> <Private-Package>${bundle.namespace}.impl.*</Private-Package> - <Import-Package>org.apache.streams.osgi.components.activityconsumer,org.apache.commons.logging</Import-Package> + <Import-Package>org.apache.streams.osgi.components.activityconsumer,org.apache.streams.osgi.components.activityconsumer.impl,org.apache.commons.logging,org.codehaus.jackson.*;version="${jackson.version}",javax.xml.datatype, javax.xml.namespace, javax.xml.parsers, org.joda.time, org.joda.time.format, org.w3c.dom, org.w3c.dom.bootstrap, org.w3c.dom.ls, org.xml.sax</Import-Package> <Include-Resource>src/main/resources</Include-Resource> </instructions> </configuration> @@ -88,6 +89,18 @@ </dependency> <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mrbean</artifactId> + <version>${jackson.version}</version> + </dependency> + + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + <version>${jackson.version}</version> + </dependency> + + <dependency> <groupId>org.osgi</groupId> <artifactId>osgi_R4_compendium</artifactId> <version>1.0</version> @@ -100,6 +113,7 @@ <version>${commons.log}</version> <scope>provided</scope> </dependency> + </dependencies> </project> \ No newline at end of file Modified: incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/ActivityConsumer.java URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/ActivityConsumer.java?rev=1442747&r1=1442746&r2=1442747&view=diff ============================================================================== --- incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/ActivityConsumer.java (original) +++ incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/ActivityConsumer.java Tue Feb 5 21:18:41 2013 @@ -1,12 +1,21 @@ package org.apache.streams.osgi.components.activityconsumer; +import org.codehaus.jackson.annotate.JsonTypeInfo; + +import java.net.URI; + + +@JsonTypeInfo(use= JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class") public interface ActivityConsumer { public String receive(String activity); public void init(); - public String getSrc(); + public URI getSrc(); + public void setSrc(String src); public void setInRoute(String route); public String getInRoute(); + public String getAuthToken(); + public void setAuthToken(String token); public boolean isAuthenticated(); public void setAuthenticated(boolean authenticated); } Modified: incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/ActivityConsumerWarehouseImpl.java URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/ActivityConsumerWarehouseImpl.java?rev=1442747&r1=1442746&r2=1442747&view=diff ============================================================================== --- incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/ActivityConsumerWarehouseImpl.java (original) +++ incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/ActivityConsumerWarehouseImpl.java Tue Feb 5 21:18:41 2013 @@ -20,7 +20,7 @@ public class ActivityConsumerWarehouseIm public void register(ActivityConsumer activityConsumer) { //key in warehouse is the activity publisher URI source - consumers.put(activityConsumer.getSrc(), activityConsumer); + consumers.put(activityConsumer.getSrc().toASCIIString(), activityConsumer); activityConsumer.init(); Modified: incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/PushActivityConsumer.java URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/PushActivityConsumer.java?rev=1442747&r1=1442746&r2=1442747&view=diff ============================================================================== --- incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/PushActivityConsumer.java (original) +++ incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/PushActivityConsumer.java Tue Feb 5 21:18:41 2013 @@ -3,6 +3,12 @@ package org.apache.streams.osgi.componen import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.streams.osgi.components.activityconsumer.ActivityConsumer; + +import javax.tools.JavaFileManager; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; import java.util.List; import java.util.ArrayList; @@ -10,24 +16,40 @@ public class PushActivityConsumer implem private static final transient Log LOG = LogFactory.getLog(PushActivityConsumer.class); - private String src; + private URI src; + + private String authToken; private boolean authenticated; private String inRoute; - public PushActivityConsumer(String src){ - this.setSrc(src); + public PushActivityConsumer(){ + } - public String getSrc() { + + public URI getSrc() { return src; } public void setSrc(String src) { - this.src = src; + try{ + this.src = new URI(src); + + } catch (URISyntaxException e) { + this.src=null; + } + } + + public String getAuthToken() { + return authToken; + } + + public void setAuthToken(String authToken) { + this.authToken = authToken; } public boolean isAuthenticated() { Modified: incubator/streams/trunk/streams-osgi-components/activity-registration/pom.xml URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-registration/pom.xml?rev=1442747&r1=1442746&r2=1442747&view=diff ============================================================================== --- incubator/streams/trunk/streams-osgi-components/activity-registration/pom.xml (original) +++ incubator/streams/trunk/streams-osgi-components/activity-registration/pom.xml Tue Feb 5 21:18:41 2013 @@ -69,7 +69,7 @@ <Bundle-Version>${pom.version}</Bundle-Version> <Export-Package>${bundle.namespace};version="${pom.version}"</Export-Package> <Private-Package>${bundle.namespace}.impl.*</Private-Package> - <Import-Package>org.apache.streams.osgi.components.activityconsumer.impl,org.apache.streams.osgi.components.activitysubscriber,org.apache.streams.osgi.components.activitysubscriber.impl,org.apache.commons.logging</Import-Package> + <Import-Package>org.apache.streams.osgi.components.activityconsumer.impl,org.apache.streams.osgi.components.activityconsumer,org.apache.streams.osgi.components.activitysubscriber,org.apache.streams.osgi.components.activitysubscriber.impl,org.apache.commons.logging</Import-Package> <Include-Resource>src/main/resources</Include-Resource> </instructions> </configuration> Modified: incubator/streams/trunk/streams-osgi-components/activity-registration/src/main/java/org/apache/streams/osgi/components/impl/ActivityPublisherRegistrationImpl.java URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-registration/src/main/java/org/apache/streams/osgi/components/impl/ActivityPublisherRegistrationImpl.java?rev=1442747&r1=1442746&r2=1442747&view=diff ============================================================================== --- incubator/streams/trunk/streams-osgi-components/activity-registration/src/main/java/org/apache/streams/osgi/components/impl/ActivityPublisherRegistrationImpl.java (original) +++ incubator/streams/trunk/streams-osgi-components/activity-registration/src/main/java/org/apache/streams/osgi/components/impl/ActivityPublisherRegistrationImpl.java Tue Feb 5 21:18:41 2013 @@ -5,6 +5,7 @@ import java.util.Date; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.streams.osgi.components.ActivityPublisherRegistration; +import org.apache.streams.osgi.components.activityconsumer.ActivityConsumer; import org.apache.streams.osgi.components.activityconsumer.impl.PushActivityConsumer; public class ActivityPublisherRegistrationImpl implements ActivityPublisherRegistration { @@ -22,8 +23,8 @@ public class ActivityPublisherRegistrati String answer = prefix + " set body: " + body + " " + new Date(); LOG.info(">> call >>" + answer); - //should be configed like the subscriber = what type? polling? how often? etc... - PushActivityConsumer activityConsumer =new PushActivityConsumer(body.toString()); + + ActivityConsumer activityConsumer =(ActivityConsumer)body; //authenticate.. activityConsumer.setAuthenticated(true); return activityConsumer;