Author: davsclaus
Date: Wed Jun 13 09:25:16 2012
New Revision: 1349704
URL: http://svn.apache.org/viewvc?rev=1349704&view=rev
Log:
CAMEL-5225: Configured encoders and decoders must be shareable or implement
ChannelHandlerFactory to be safely used with Netty.
Added:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactories.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactory.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ShareableChannelHandlerFactory.java
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflicts2Test.java
Modified:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflictsTest.java
camel/trunk/components/camel-netty/src/test/resources/org/apache/camel/component/netty/multiple-codecs.xml
Added:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactories.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactories.java?rev=1349704&view=auto
==============================================================================
---
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactories.java
(added)
+++
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactories.java
Wed Jun 13 09:25:16 2012
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.nio.charset.Charset;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandler;
+import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.codec.serialization.ClassResolvers;
+import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
+import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
+import org.jboss.netty.handler.codec.string.StringDecoder;
+import org.jboss.netty.handler.codec.string.StringEncoder;
+
+/**
+ * Helper to create commonly used {@link ChannelHandlerFactory} instances.
+ */
+public final class ChannelHandlerFactories {
+
+ private ChannelHandlerFactories() {
+ }
+
+ public static ChannelHandlerFactory newStringEncoder(Charset charset) {
+ return new ShareableChannelHandlerFactory(new StringEncoder(charset));
+ }
+
+ public static ChannelHandlerFactory newStringDecoder(Charset charset) {
+ return new ShareableChannelHandlerFactory(new StringDecoder(charset));
+ }
+
+ public static ChannelHandlerFactory newObjectDecoder() {
+ return new ChannelHandlerFactory() {
+ @Override
+ public ChannelHandler newChannelHandler() {
+ return new
ObjectDecoder(ClassResolvers.weakCachingResolver(null));
+ }
+ };
+ }
+
+ public static ChannelHandlerFactory newObjectEncoder() {
+ return new ShareableChannelHandlerFactory(new ObjectEncoder());
+ }
+
+ public static ChannelHandlerFactory newDelimiterBasedFrameDecoder(final
int maxFrameLength, final ChannelBuffer[] delimiters) {
+ return new ChannelHandlerFactory() {
+ @Override
+ public ChannelHandler newChannelHandler() {
+ return new DelimiterBasedFrameDecoder(maxFrameLength, true,
delimiters);
+ }
+ };
+ }
+
+ public static ChannelHandlerFactory newLengthFieldBasedFrameDecoder(final
int maxFrameLength, final int lengthFieldOffset,
+ final
int lengthFieldLength, final int lengthAdjustment,
+ final
int initialBytesToStrip) {
+ return new ChannelHandlerFactory() {
+ @Override
+ public ChannelHandler newChannelHandler() {
+ return new LengthFieldBasedFrameDecoder(maxFrameLength,
lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
+ }
+ };
+ }
+
+}
Added:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactory.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactory.java?rev=1349704&view=auto
==============================================================================
---
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactory.java
(added)
+++
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactory.java
Wed Jun 13 09:25:16 2012
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.jboss.netty.channel.ChannelHandler;
+
+/**
+ * Factory for creating new {@link ChannelHandler} used for non shareable
+ * encoders and decoders configured on the Camel {@link NettyComponent}.
+ * <p/>
+ * This is needed as Netty's {@link ChannelHandler} is often not shareable
+ * and therefore a new instance must be created when a handler is being
+ * added to a pipeline.
+ */
+public interface ChannelHandlerFactory extends ChannelHandler {
+
+ /**
+ * Creates a new {@link ChannelHandler} to be used.
+ */
+ ChannelHandler newChannelHandler();
+}
Modified:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java?rev=1349704&r1=1349703&r2=1349704&view=diff
==============================================================================
---
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
(original)
+++
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
Wed Jun 13 09:25:16 2012
@@ -22,9 +22,8 @@ import javax.net.ssl.SSLEngine;
import org.apache.camel.component.netty.handlers.ClientChannelHandler;
import org.apache.camel.component.netty.ssl.SSLEngineFactory;
-import org.jboss.netty.channel.ChannelDownstreamHandler;
+import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
@@ -46,25 +45,40 @@ public class DefaultClientPipelineFactor
SslHandler sslHandler = configureClientSSLOnDemand(producer);
if (sslHandler != null) {
LOG.debug("Client SSL handler configured and added to the
ChannelPipeline");
- channelPipeline.addLast("ssl", sslHandler);
+ addToPipeline("ssl", channelPipeline, sslHandler);
}
- List<ChannelUpstreamHandler> decoders =
producer.getConfiguration().getDecoders();
+ List<ChannelHandler> decoders =
producer.getConfiguration().getDecoders();
for (int x = 0; x < decoders.size(); x++) {
- channelPipeline.addLast("decoder-" + x, decoders.get(x));
+ ChannelHandler decoder = decoders.get(x);
+ if (decoder instanceof ChannelHandlerFactory) {
+ // use the factory to create a new instance of the channel as
it may not be shareable
+ decoder = ((ChannelHandlerFactory)
decoder).newChannelHandler();
+ }
+ addToPipeline("decoder-" + x, channelPipeline, decoder);
}
- List<ChannelDownstreamHandler> encoders =
producer.getConfiguration().getEncoders();
+ List<ChannelHandler> encoders =
producer.getConfiguration().getEncoders();
for (int x = 0; x < encoders.size(); x++) {
- channelPipeline.addLast("encoder-" + x, encoders.get(x));
+ ChannelHandler encoder = encoders.get(x);
+ if (encoder instanceof ChannelHandlerFactory) {
+ // use the factory to create a new instance of the channel as
it may not be shareable
+ encoder = ((ChannelHandlerFactory)
encoder).newChannelHandler();
+ }
+ addToPipeline("encoder-" + x, channelPipeline, encoder);
}
// our handler must be added last
- channelPipeline.addLast("handler", new ClientChannelHandler(producer));
+ addToPipeline("handler", channelPipeline, new
ClientChannelHandler(producer));
+ LOG.trace("Created ChannelPipeline: {}", channelPipeline);
return channelPipeline;
}
+ private void addToPipeline(String name, ChannelPipeline pipeline,
ChannelHandler handler) {
+ pipeline.addLast(name, handler);
+ }
+
private SslHandler configureClientSSLOnDemand(NettyProducer producer)
throws Exception {
if (!producer.getConfiguration().isSsl()) {
return null;
Modified:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java?rev=1349704&r1=1349703&r2=1349704&view=diff
==============================================================================
---
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java
(original)
+++
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java
Wed Jun 13 09:25:16 2012
@@ -22,9 +22,8 @@ import javax.net.ssl.SSLEngine;
import org.apache.camel.component.netty.handlers.ServerChannelHandler;
import org.apache.camel.component.netty.ssl.SSLEngineFactory;
-import org.jboss.netty.channel.ChannelDownstreamHandler;
+import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
@@ -46,24 +45,40 @@ public class DefaultServerPipelineFactor
SslHandler sslHandler = configureServerSSLOnDemand(consumer);
if (sslHandler != null) {
LOG.debug("Server SSL handler configured and added as an
interceptor against the ChannelPipeline");
- channelPipeline.addLast("ssl", sslHandler);
+ addToPipeline("ssl", channelPipeline, sslHandler);
}
- List<ChannelDownstreamHandler> encoders =
consumer.getConfiguration().getEncoders();
+
+ List<ChannelHandler> encoders =
consumer.getConfiguration().getEncoders();
for (int x = 0; x < encoders.size(); x++) {
- channelPipeline.addLast("encoder-" + x, encoders.get(x));
+ ChannelHandler encoder = encoders.get(x);
+ if (encoder instanceof ChannelHandlerFactory) {
+ // use the factory to create a new instance of the channel as
it may not be shareable
+ encoder = ((ChannelHandlerFactory)
encoder).newChannelHandler();
+ }
+ addToPipeline("encoder-" + x, channelPipeline, encoder);
}
- List<ChannelUpstreamHandler> decoders =
consumer.getConfiguration().getDecoders();
+ List<ChannelHandler> decoders =
consumer.getConfiguration().getDecoders();
for (int x = 0; x < decoders.size(); x++) {
- channelPipeline.addLast("decoder-" + x, decoders.get(x));
+ ChannelHandler decoder = decoders.get(x);
+ if (decoder instanceof ChannelHandlerFactory) {
+ // use the factory to create a new instance of the channel as
it may not be shareable
+ decoder = ((ChannelHandlerFactory)
decoder).newChannelHandler();
+ }
+ addToPipeline("decoder-" + x, channelPipeline, decoder);
}
// our handler must be added last
- channelPipeline.addLast("handler", new ServerChannelHandler(consumer));
+ addToPipeline("handler", channelPipeline, new
ServerChannelHandler(consumer));
+ LOG.trace("Created ChannelPipeline: {}", channelPipeline);
return channelPipeline;
}
-
+
+ private void addToPipeline(String name, ChannelPipeline pipeline,
ChannelHandler handler) {
+ pipeline.addLast(name, handler);
+ }
+
private SslHandler configureServerSSLOnDemand(NettyConsumer consumer)
throws Exception {
if (!consumer.getConfiguration().isSsl()) {
return null;
Modified:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java?rev=1349704&r1=1349703&r2=1349704&view=diff
==============================================================================
---
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
(original)
+++
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
Wed Jun 13 09:25:16 2012
@@ -48,6 +48,9 @@ public class NettyComponent extends Defa
config.parseURI(new URI(remaining), parameters, this);
+ // validate config
+ config.validateConfiguration();
+
NettyEndpoint nettyEndpoint = new NettyEndpoint(remaining, this,
config);
nettyEndpoint.setTimer(getTimer());
setProperties(nettyEndpoint.getConfiguration(), parameters);
Modified:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=1349704&r1=1349703&r2=1349704&view=diff
==============================================================================
---
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
(original)
+++
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
Wed Jun 13 09:25:16 2012
@@ -26,16 +26,11 @@ import java.util.Map;
import org.apache.camel.LoggingLevel;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.util.EndpointHelper;
+import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.jsse.SSLContextParameters;
-import org.jboss.netty.channel.ChannelDownstreamHandler;
-import org.jboss.netty.channel.ChannelUpstreamHandler;
-import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.handler.codec.frame.Delimiters;
-import org.jboss.netty.handler.codec.serialization.ClassResolvers;
-import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
-import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
-import org.jboss.netty.handler.codec.string.StringDecoder;
-import org.jboss.netty.handler.codec.string.StringEncoder;
import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.util.CharsetUtil;
import org.slf4j.Logger;
@@ -62,8 +57,8 @@ public class NettyConfiguration implemen
private File keyStoreFile;
private File trustStoreFile;
private SslHandler sslHandler;
- private List<ChannelDownstreamHandler> encoders = new
ArrayList<ChannelDownstreamHandler>();
- private List<ChannelUpstreamHandler> decoders = new
ArrayList<ChannelUpstreamHandler>();
+ private List<ChannelHandler> encoders = new ArrayList<ChannelHandler>();
+ private List<ChannelHandler> decoders = new ArrayList<ChannelHandler>();
private boolean ssl;
private long sendBufferSize = 65536;
private long receiveBufferSize = 65536;
@@ -84,13 +79,14 @@ public class NettyConfiguration implemen
/**
* Returns a copy of this configuration
*/
+ @SuppressWarnings("unchecked")
public NettyConfiguration copy() {
try {
NettyConfiguration answer = (NettyConfiguration) clone();
// make sure the lists is copied in its own instance
- List<ChannelDownstreamHandler> encodersCopy = new
ArrayList<ChannelDownstreamHandler>(encoders);
+ List encodersCopy = new ArrayList(encoders);
answer.setEncoders(encodersCopy);
- List<ChannelUpstreamHandler> decodersCopy = new
ArrayList<ChannelUpstreamHandler>(decoders);
+ List decodersCopy = new ArrayList(decoders);
answer.setDecoders(decodersCopy);
return answer;
} catch (CloneNotSupportedException e) {
@@ -98,6 +94,37 @@ public class NettyConfiguration implemen
}
}
+ public void validateConfiguration() {
+ // validate that the encoders is either shareable or is a handler
factory
+ for (ChannelHandler encoder : encoders) {
+ if (encoder instanceof ChannelHandlerFactory) {
+ continue;
+ }
+ if (ObjectHelper.getAnnotation(encoder,
ChannelHandler.Sharable.class) != null) {
+ continue;
+ }
+ LOG.warn("The encoder {} is not @Shareable or an
ChannelHandlerFactory instance. The encoder cannot safely be used.", encoder);
+ }
+
+ // validate that the decoders is either shareable or is a handler
factory
+ for (ChannelHandler decoder : decoders) {
+ if (decoder instanceof ChannelHandlerFactory) {
+ continue;
+ }
+ if (ObjectHelper.getAnnotation(decoder,
ChannelHandler.Sharable.class) != null) {
+ continue;
+ }
+ LOG.warn("The decoder {} is not @Shareable or an
ChannelHandlerFactory instance. The decoder cannot safely be used.", decoder);
+ }
+ if (sslHandler != null) {
+ boolean factory = sslHandler instanceof ChannelHandlerFactory;
+ boolean shareable = ObjectHelper.getAnnotation(sslHandler,
ChannelHandler.Sharable.class) != null;
+ if (!factory && !shareable) {
+ LOG.warn("The sslHandler {} is not @Shareable or an
ChannelHandlerFactory instance. The sslHandler cannot safely be used.",
sslHandler);
+ }
+ }
+ }
+
public void parseURI(URI uri, Map<String, Object> parameters,
NettyComponent component) throws Exception {
protocol = uri.getScheme();
@@ -118,10 +145,10 @@ public class NettyConfiguration implemen
serverPipelineFactory =
component.resolveAndRemoveReferenceParameter(parameters,
"serverPipelineFactory", ServerPipelineFactory.class, null);
// set custom encoders and decoders first
- List<ChannelDownstreamHandler> referencedEncoders =
component.resolveAndRemoveReferenceListParameter(parameters, "encoders",
ChannelDownstreamHandler.class, null);
- addToHandlersList(encoders, referencedEncoders,
ChannelDownstreamHandler.class);
- List<ChannelUpstreamHandler> referencedDecoders =
component.resolveAndRemoveReferenceListParameter(parameters, "decoders",
ChannelUpstreamHandler.class, null);
- addToHandlersList(decoders, referencedDecoders,
ChannelUpstreamHandler.class);
+ List<ChannelHandler> referencedEncoders =
component.resolveAndRemoveReferenceListParameter(parameters, "encoders",
ChannelHandler.class, null);
+ addToHandlersList(encoders, referencedEncoders, ChannelHandler.class);
+ List<ChannelHandler> referencedDecoders =
component.resolveAndRemoveReferenceListParameter(parameters, "decoders",
ChannelHandler.class, null);
+ addToHandlersList(decoders, referencedDecoders, ChannelHandler.class);
// then set parameters with the help of the camel context type
converters
EndpointHelper.setReferenceProperties(component.getCamelContext(),
this, parameters);
@@ -133,9 +160,10 @@ public class NettyConfiguration implemen
// are we textline or object?
if (isTextline()) {
Charset charset = getEncoding() != null ?
Charset.forName(getEncoding()) : CharsetUtil.UTF_8;
- encoders.add(new StringEncoder(charset));
- decoders.add(new
DelimiterBasedFrameDecoder(decoderMaxLineLength, true, delimiter ==
TextLineDelimiter.LINE ? Delimiters.lineDelimiter() :
Delimiters.nulDelimiter()));
- decoders.add(new StringDecoder(charset));
+
encoders.add(ChannelHandlerFactories.newStringEncoder(charset));
+ ChannelBuffer[] delimiters = delimiter ==
TextLineDelimiter.LINE ? Delimiters.lineDelimiter() : Delimiters.nulDelimiter();
+
decoders.add(ChannelHandlerFactories.newDelimiterBasedFrameDecoder(decoderMaxLineLength,
delimiters));
+
decoders.add(ChannelHandlerFactories.newStringDecoder(charset));
if (LOG.isDebugEnabled()) {
LOG.debug("Using textline encoders and decoders with
charset: {}, delimiter: {} and decoderMaxLineLength: {}",
@@ -143,8 +171,8 @@ public class NettyConfiguration implemen
}
} else {
// object serializable is then used
- encoders.add(new ObjectEncoder());
- decoders.add(new
ObjectDecoder(ClassResolvers.weakCachingResolver(null)));
+ encoders.add(ChannelHandlerFactories.newObjectEncoder());
+ decoders.add(ChannelHandlerFactories.newObjectDecoder());
LOG.debug("Using object encoders and decoders");
}
@@ -291,42 +319,42 @@ public class NettyConfiguration implemen
this.sslHandler = sslHandler;
}
- public List<ChannelDownstreamHandler> getEncoders() {
+ public List<ChannelHandler> getDecoders() {
+ return decoders;
+ }
+
+ public void setDecoders(List<ChannelHandler> decoders) {
+ this.decoders = decoders;
+ }
+
+ public List<ChannelHandler> getEncoders() {
return encoders;
}
- public List<ChannelUpstreamHandler> getDecoders() {
- return decoders;
+ public void setEncoders(List<ChannelHandler> encoders) {
+ this.encoders = encoders;
}
- public ChannelDownstreamHandler getEncoder() {
+ public ChannelHandler getEncoder() {
return encoders.isEmpty() ? null : encoders.get(0);
}
- public void setEncoder(ChannelDownstreamHandler encoder) {
+ public void setEncoder(ChannelHandler encoder) {
if (!encoders.contains(encoder)) {
encoders.add(encoder);
}
}
- public void setEncoders(List<ChannelDownstreamHandler> encoders) {
- this.encoders = encoders;
- }
-
- public ChannelUpstreamHandler getDecoder() {
+ public ChannelHandler getDecoder() {
return decoders.isEmpty() ? null : decoders.get(0);
}
- public void setDecoder(ChannelUpstreamHandler decoder) {
+ public void setDecoder(ChannelHandler decoder) {
if (!decoders.contains(decoder)) {
decoders.add(decoder);
}
}
- public void setDecoders(List<ChannelUpstreamHandler> decoders) {
- this.decoders = decoders;
- }
-
public long getSendBufferSize() {
return sendBufferSize;
}
@@ -451,17 +479,6 @@ public class NettyConfiguration implemen
return host + ":" + port;
}
- private <T> void addToHandlersList(List<T> configured, List<T> handlers,
Class<T> handlerType) {
- if (handlers != null) {
- for (int x = 0; x < handlers.size(); x++) {
- T handler = handlers.get(x);
- if (handlerType.isInstance(handler)) {
- configured.add(handler);
- }
- }
- }
- }
-
public void setClientPipelineFactory(ClientPipelineFactory
clientPipelineFactory) {
this.clientPipelineFactory = clientPipelineFactory;
}
@@ -477,7 +494,7 @@ public class NettyConfiguration implemen
public ServerPipelineFactory getServerPipelineFactory() {
return serverPipelineFactory;
}
-
+
public int getWorkerCount() {
return workerCount;
}
@@ -493,4 +510,15 @@ public class NettyConfiguration implemen
public void setSslContextParameters(SSLContextParameters
sslContextParameters) {
this.sslContextParameters = sslContextParameters;
}
+
+ private static <T> void addToHandlersList(List<T> configured, List<T>
handlers, Class<T> handlerType) {
+ if (handlers != null) {
+ for (int x = 0; x < handlers.size(); x++) {
+ T handler = handlers.get(x);
+ if (handlerType.isInstance(handler)) {
+ configured.add(handler);
+ }
+ }
+ }
+ }
}
Added:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ShareableChannelHandlerFactory.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ShareableChannelHandlerFactory.java?rev=1349704&view=auto
==============================================================================
---
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ShareableChannelHandlerFactory.java
(added)
+++
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ShareableChannelHandlerFactory.java
Wed Jun 13 09:25:16 2012
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.jboss.netty.channel.ChannelHandler;
+
+/**
+ * A {@link ChannelHandlerFactory} returning a shareable {@link
ChannelHandler}.
+ */
+public class ShareableChannelHandlerFactory implements ChannelHandlerFactory {
+
+ private final ChannelHandler channelHandler;
+
+ public ShareableChannelHandlerFactory(ChannelHandler channelHandler) {
+ this.channelHandler = channelHandler;
+ }
+
+ @Override
+ public ChannelHandler newChannelHandler() {
+ return channelHandler;
+ }
+}
Modified:
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java?rev=1349704&r1=1349703&r2=1349704&view=diff
==============================================================================
---
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java
(original)
+++
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java
Wed Jun 13 09:25:16 2012
@@ -24,6 +24,7 @@ import org.apache.camel.builder.RouteBui
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.JndiRegistry;
import org.jboss.netty.channel.ChannelDownstreamHandler;
+import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
@@ -38,7 +39,8 @@ public class MultipleCodecsTest extends
JndiRegistry registry = super.createRegistry();
// START SNIPPET: registry-beans
- LengthFieldBasedFrameDecoder lengthDecoder = new
LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4);
+ ChannelHandlerFactory lengthDecoder =
ChannelHandlerFactories.newLengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4);
+
StringDecoder stringDecoder = new StringDecoder();
registry.bind("length-decoder", lengthDecoder);
registry.bind("string-decoder", stringDecoder);
@@ -48,11 +50,11 @@ public class MultipleCodecsTest extends
registry.bind("length-encoder", lengthEncoder);
registry.bind("string-encoder", stringEncoder);
- List<ChannelUpstreamHandler> decoders = new
ArrayList<ChannelUpstreamHandler>();
+ List<ChannelHandler> decoders = new ArrayList<ChannelHandler>();
decoders.add(lengthDecoder);
decoders.add(stringDecoder);
- List<ChannelDownstreamHandler> encoders = new
ArrayList<ChannelDownstreamHandler>();
+ List<ChannelHandler> encoders = new ArrayList<ChannelHandler>();
encoders.add(lengthEncoder);
encoders.add(stringEncoder);
Added:
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflicts2Test.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflicts2Test.java?rev=1349704&view=auto
==============================================================================
---
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflicts2Test.java
(added)
+++
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflicts2Test.java
Wed Jun 13 09:25:16 2012
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.Arrays;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class UnsharableCodecsConflicts2Test extends BaseNettyTest {
+
+ static final byte[] LENGTH_HEADER = {0x00, 0x00, 0x40, 0x00}; // 16384
bytes
+
+ private Processor processor = new P();
+ private int port;
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry registry = super.createRegistry();
+
+ // create a single decoder
+ ChannelHandlerFactory decoder =
ChannelHandlerFactories.newLengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4);
+ registry.bind("length-decoder", decoder);
+
+ return registry;
+ }
+
+ @Test
+ public void unsharableCodecsConflictsTest() throws Exception {
+ byte[] data1 = new byte[8192];
+ byte[] data2 = new byte[16383];
+ Arrays.fill(data1, (byte) 0x38);
+ Arrays.fill(data2, (byte) 0x39);
+ byte[] body1 = (new String(LENGTH_HEADER) + new
String(data1)).getBytes();
+ byte[] body2 = (new String(LENGTH_HEADER) + new
String(data2)).getBytes();
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived(new String(data2) + "9");
+
+ Socket client1 = getSocket("localhost", port);
+ Socket client2 = getSocket("localhost", port);
+
+ // use two clients to send to the same server at the same time
+ try {
+ sendBuffer(body2, client2);
+ sendBuffer(body1, client1);
+ sendBuffer(new String("9").getBytes(), client2);
+ } catch (Exception e) {
+ log.error("", e);
+ } finally {
+ client1.close();
+ client2.close();
+ }
+
+ mock.assertIsSatisfied();
+ }
+
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ port = getPort();
+
+
from("netty:tcp://localhost:{{port}}?decoder=#length-decoder&sync=false")
+ .process(processor)
+ .to("mock:result");
+ }
+ };
+ }
+
+ private static Socket getSocket(String host, int port) throws IOException {
+ Socket s = new Socket(host, port);
+ s.setSoTimeout(60000);
+ return s;
+ }
+
+ public static void sendBuffer(byte[] buf, Socket server) throws Exception {
+ OutputStream netOut = server.getOutputStream();
+ OutputStream dataOut = new BufferedOutputStream(netOut);
+ try {
+ dataOut.write(buf, 0, buf.length);
+ dataOut.flush();
+ } catch (Exception e) {
+ server.close();
+ throw e;
+ }
+ }
+
+ class P implements Processor {
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getOut().setBody(
+ new String(((BigEndianHeapChannelBuffer) exchange.getIn()
+ .getBody()).array()));
+ }
+ }
+}
Modified:
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflictsTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflictsTest.java?rev=1349704&r1=1349703&r2=1349704&view=diff
==============================================================================
---
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflictsTest.java
(original)
+++
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflictsTest.java
Wed Jun 13 09:25:16 2012
@@ -28,7 +28,6 @@ import org.apache.camel.component.mock.M
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.util.IOHelper;
import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import org.junit.Test;
/**
@@ -47,12 +46,11 @@ public class UnsharableCodecsConflictsTe
protected JndiRegistry createRegistry() throws Exception {
JndiRegistry registry = super.createRegistry();
- // the decoders cannot be shared with multiple netty consumers, so we
need one for each consumer
- LengthFieldBasedFrameDecoder lengthDecoder = new
LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4);
- registry.bind("length-decoder", lengthDecoder);
-
- LengthFieldBasedFrameDecoder lengthDecoder2 = new
LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4);
- registry.bind("length-decoder2", lengthDecoder2);
+ // we can share the decoder between multiple netty consumers, because
they have the same configuration
+ // and we use a ChannelHandlerFactory
+ ChannelHandlerFactory decoder =
ChannelHandlerFactories.newLengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4);
+ registry.bind("length-decoder", decoder);
+ registry.bind("length-decoder2", decoder);
return registry;
}
Modified:
camel/trunk/components/camel-netty/src/test/resources/org/apache/camel/component/netty/multiple-codecs.xml
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/resources/org/apache/camel/component/netty/multiple-codecs.xml?rev=1349704&r1=1349703&r2=1349704&view=diff
==============================================================================
---
camel/trunk/components/camel-netty/src/test/resources/org/apache/camel/component/netty/multiple-codecs.xml
(original)
+++
camel/trunk/components/camel-netty/src/test/resources/org/apache/camel/component/netty/multiple-codecs.xml
Wed Jun 13 09:25:16 2012
@@ -37,7 +37,7 @@
<!-- START SNIPPET: registry-beans -->
<util:list id="decoders" list-class="java.util.LinkedList">
- <bean
class="org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder">
+ <bean class="org.apache.camel.component.netty.ChannelHandlerFactories"
factory-method="newLengthFieldBasedFrameDecoder">
<constructor-arg value="1048576"/>
<constructor-arg value="0"/>
<constructor-arg value="4"/>
@@ -59,7 +59,7 @@
</bean>
<bean id="string-encoder"
class="org.jboss.netty.handler.codec.string.StringEncoder"/>
- <bean id="length-decoder"
class="org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder">
+ <bean id="length-decoder"
class="org.apache.camel.component.netty.ChannelHandlerFactories"
factory-method="newLengthFieldBasedFrameDecoder">
<constructor-arg value="1048576"/>
<constructor-arg value="0"/>
<constructor-arg value="4"/>