Author: cmoulliard Date: Thu May 24 14:55:58 2012 New Revision: 1342288 URL: http://svn.apache.org/viewvc?rev=1342288&view=rev Log: Add org.apache.camel.component.websocket.WebsocketTwoRoutesExampleTest
Added: camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesExampleTest.java Modified: camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java Modified: camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java?rev=1342288&r1=1342287&r2=1342288&view=diff ============================================================================== --- camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java (original) +++ camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java Thu May 24 14:55:58 2012 @@ -195,6 +195,8 @@ public class WebsocketComponent extends connector = new SelectChannelConnector(); } + LOG.debug("Jetty Connector added : " + connector.getName()); + if (port != null) { connector.setPort(port); } else { @@ -242,6 +244,7 @@ public class WebsocketComponent extends connectorRef = new ConnectorRef(server, connector, defaultServlet); CONNECTORS.put(connectorKey, connectorRef); + LOG.debug("Jetty Server started for host : " + connector.getHost() + ", on port : " + connector.getPort()); server.start(); } else { @@ -417,6 +420,7 @@ public class WebsocketComponent extends WebsocketComponentServlet servlet = new WebsocketComponentServlet(sync); servlets.put(pathSpec, servlet); handler.addServlet(new ServletHolder(servlet), pathSpec); + LOG.debug("WebSocket servlet added for the following path : " + pathSpec); return servlet; } Modified: camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java?rev=1342288&r1=1342287&r2=1342288&view=diff ============================================================================== --- camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java (original) +++ camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java Thu May 24 14:55:58 2012 @@ -33,8 +33,8 @@ public class WebsocketConsumer extends D @Override public void start() throws Exception { - super.start(); endpoint.connect(this); + super.start(); } @Override @@ -48,6 +48,15 @@ public class WebsocketConsumer extends D } public void sendMessage(final String connectionKey, final String message) { + +/* if (!endpoint.isStarted()) { + try { + endpoint.connect(this); + } catch (Exception e) { + e.printStackTrace(); + } + }*/ + final Exchange exchange = getEndpoint().createExchange(); // set header and body Modified: camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java?rev=1342288&r1=1342287&r2=1342288&view=diff ============================================================================== --- camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java (original) +++ camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java Thu May 24 14:55:58 2012 @@ -42,6 +42,10 @@ public class WebsocketProducer extends D Message in = exchange.getIn(); String message = in.getMandatoryBody(String.class); +/* if (!endpoint.isStarted()) { + endpoint.connect(this); + }*/ + if (isSendToAllSet(in)) { sendToAll(store, message, exchange); } else { @@ -61,6 +65,19 @@ public class WebsocketProducer extends D return endpoint; } + + @Override + public void start() throws Exception { + endpoint.connect(this); + super.start(); + } + + @Override + public void stop() throws Exception { + endpoint.disconnect(this); + super.stop(); + } + boolean isSendToAllSet(Message in) { // header may be null; have to be careful here (and fallback to use sendToAll option configured from endpoint) Boolean value = in.getHeader(WebsocketConstants.SEND_TO_ALL, sendToAll, Boolean.class); Added: camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesExampleTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesExampleTest.java?rev=1342288&view=auto ============================================================================== --- camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesExampleTest.java (added) +++ camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesExampleTest.java Thu May 24 14:55:58 2012 @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.websocket; + +import com.ning.http.client.AsyncHttpClient; +import com.ning.http.client.websocket.WebSocket; +import com.ning.http.client.websocket.WebSocketTextListener; +import com.ning.http.client.websocket.WebSocketUpgradeHandler; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class WebsocketTwoRoutesExampleTest extends CamelTestSupport { + + private static List<String> received = new ArrayList<String>(); + private static CountDownLatch latch = new CountDownLatch(1); + + @Test + public void testWSHttpCallEcho1() throws Exception { + AsyncHttpClient c = new AsyncHttpClient(); + + WebSocket websocket = c.prepareGet("ws://127.0.0.1:9292/echo").execute( + new WebSocketUpgradeHandler.Builder() + .addWebSocketListener(new WebSocketTextListener() { + @Override + public void onMessage(String message) { + received.add(message); + log.info("received --> " + message); + latch.countDown(); + } + + @Override + public void onFragment(String fragment, boolean last) { + } + + @Override + public void onOpen(WebSocket websocket) { + } + + @Override + public void onClose(WebSocket websocket) { + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + }).build()).get(); + + websocket.sendTextMessage("Beer"); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + assertEquals(1, received.size()); + assertEquals("BeerBeer", received.get(0)); + + websocket.close(); + c.close(); + } + + @Test + public void testWSHttpCallEcho2() throws Exception { + AsyncHttpClient c = new AsyncHttpClient(); + + WebSocket websocket = c.prepareGet("ws://127.0.0.1:9393/echo").execute( + new WebSocketUpgradeHandler.Builder() + .addWebSocketListener(new WebSocketTextListener() { + @Override + public void onMessage(String message) { + received.add(message); + log.info("received --> " + message); + latch.countDown(); + } + + @Override + public void onFragment(String fragment, boolean last) { + } + + @Override + public void onOpen(WebSocket websocket) { + } + + @Override + public void onClose(WebSocket websocket) { + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + }).build()).get(); + + websocket.sendTextMessage("Beer"); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + assertEquals(1, received.size()); + assertEquals("BeerBeer", received.get(0)); + + websocket.close(); + c.close(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + + from("websocket://localhost:9292/echo") + .log(">>> Message received from WebSocket Client : ${body}") + .transform().simple("${body}${body}") + .to("websocket://localhost:9292/echo"); + + from("websocket://localhost:9393/echo") + .log(">>> Message received from WebSocket Client : ${body}") + .transform().simple("${body}${body}") + .to("websocket://localhost:9292/echo"); + } + }; + } +}