Author: rajdavies
Date: Thu Jan 15 10:45:30 2009
New Revision: 734779
URL: http://svn.apache.org/viewvc?rev=734779&view=rev
Log:
Added network concept - a logical group of channels to broadcast too
Added:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
(with props)
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
(with props)
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
(with props)
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/package.html
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java
- copied, changed from r734428,
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java
Removed:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=734779&r1=734778&r2=734779&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
(original)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
Thu Jan 15 10:45:30 2009
@@ -22,7 +22,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activeblaze.impl.destination.DestinationMatch;
-import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
import org.apache.activeblaze.impl.processor.CompressionProcessor;
import org.apache.activeblaze.impl.processor.FragmentationProcessor;
import org.apache.activeblaze.impl.processor.Packet;
@@ -44,7 +44,7 @@
*
*
*/
-public class BlazeChannelImpl extends ChainedProcessor implements
BlazeChannel, ExceptionListener {
+public class BlazeChannelImpl extends DefaultChainedProcessor implements
BlazeChannel, ExceptionListener {
protected Map<Buffer, BlazeTopicListener> topicessageListenerMap = new
ConcurrentHashMap<Buffer, BlazeTopicListener>();
protected final IdGenerator idGenerator = new IdGenerator();
protected Buffer producerId;
@@ -125,7 +125,7 @@
FragmentationProcessor fp = new FragmentationProcessor();
fp.setMaxPacketSize(maxPacketSize);
result.setEnd(fp);
- ChainedProcessor reliable =
ReliableFactory.get(getConfiguration().getReliable());
+ DefaultChainedProcessor reliable =
ReliableFactory.get(getConfiguration().getReliable());
result.setEnd(reliable);
result.setEnd(transport);
return result;
Added:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java?rev=734779&view=auto
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
(added)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
Thu Jan 15 10:45:30 2009
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.network;
+
+import java.net.URI;
+import org.apache.activeblaze.ExceptionListener;
+import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
+import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.impl.transport.BaseTransport;
+import org.apache.activeblaze.impl.transport.TransportFactory;
+
+/**
+ * Uses multicast to implement a Network
+ *
+ */
+public class MulticastNetwork extends DefaultChainedProcessor implements
Network, ExceptionListener{
+
+ private URI uri;
+ private URI managementURI;
+ private BaseTransport broadcast;
+ private BaseTransport management;
+ private String name = "";
+
+ /**
+ * @return the name
+ */
+ public String getName() {
+ return this.name;
+ }
+ /**
+ * @param name the name to set
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * @param uri
+ * @see
org.apache.activeblaze.impl.network.Network#setManagementURI(java.net.URI)
+ */
+ public void setManagementURI(URI uri) {
+ this.managementURI=uri;
+
+
+ }
+
+ /**
+ * @param uri
+ * @see org.apache.activeblaze.impl.network.Network#setURI(java.net.URI)
+ */
+ public void setURI(URI uri) {
+ this.uri=uri;
+
+ }
+
+
+
+ /**
+ * @return true if initialized
+ * @throws Exception
+ * @see org.apache.activeblaze.Service#init()
+ */
+ public boolean init() throws Exception {
+ boolean result = super.init();
+ if (result) {
+ this.broadcast = TransportFactory.get(this.uri);
+ this.broadcast.setName(getName() + "-Broadcast");
+ this.broadcast.setExceptionListener(this);
+ this.broadcast.init();
+ if(this.managementURI != null &&
!this.managementURI.equals(this.uri)){
+ this.management = TransportFactory.get(this.managementURI);
+ this.management.setName(getName() + "-Management");
+ this.management.setExceptionListener(this);
+ this.management.init();
+
+ }
+ }
+ return result;
+ }
+
+
+ /**
+ * @return true if shutDown
+ * @throws Exception
+ * @see org.apache.activeblaze.Service#shutDown()
+ */
+ public boolean shutDown() throws Exception {
+ boolean result = super.shutDown();
+ if (this.broadcast!=null) {
+ this.broadcast.shutDown();
+ }
+ if (this.management!=null) {
+ this.management.shutDown();
+ }
+ return result;
+ }
+
+ /**
+ * @return true if started
+ * @throws Exception
+ * @see org.apache.activeblaze.Service#start()
+ */
+ public boolean start() throws Exception {
+ boolean result = super.start();
+ if (this.broadcast!=null) {
+ this.broadcast.start();
+ }
+ if (this.management!=null) {
+ this.management.start();
+ }
+ return result;
+ }
+
+ /**
+ * @return true if stopped
+ * @throws Exception
+ * @see org.apache.activeblaze.Service#stop()
+ */
+ public boolean stop() throws Exception {
+ boolean result = super.stop();
+ if (this.broadcast!=null) {
+ this.broadcast.stop();
+ }
+ if (this.management!=null) {
+ this.management.stop();
+ }
+ return result;
+ }
+
+
+ /**
+ * @param packet
+ * @throws Exception
+ * @see
org.apache.activeblaze.impl.network.Network#downStreamManagement(org.apache.activeblaze.impl.processor.Packet)
+ */
+ public void downStreamManagement(Packet packet) throws Exception {
+ if (this.management != null) {
+ this.management.downStream(packet);
+ }else {
+ this.broadcast.downStream(packet);
+ }
+
+ }
+
+ /**
+ * @param packet
+ * @throws Exception
+ * @see
org.apache.activeblaze.Processor#downStream(org.apache.activeblaze.impl.processor.Packet)
+ */
+ public void downStream(Packet packet) throws Exception {
+ this.broadcast.downStream(packet);
+
+ }
+
+ /**
+ * @param l
+ * @see
org.apache.activeblaze.Processor#setExceptionListener(org.apache.activeblaze.ExceptionListener)
+ */
+ public void setExceptionListener(ExceptionListener l) {
+ // TODO Auto-generated method stub
+
+ }
+ /**
+ * @param ex
+ * @see
org.apache.activeblaze.ExceptionListener#onException(java.lang.Exception)
+ */
+ public void onException(Exception ex) {
+ if (this.exceptionListener!=null) {
+ this.exceptionListener.onException(ex);
+ }
+
+ }
+
+
+
+}
Propchange:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java?rev=734779&view=auto
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
(added)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
Thu Jan 15 10:45:30 2009
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.network;
+import java.net.URI;
+import org.apache.activeblaze.Service;
+import org.apache.activeblaze.Processor;
+import org.apache.activeblaze.impl.processor.Packet;
+
+/**
+ * <P>
+ * A <CODE>Network</CODE> defines operations that can be applied to remote
+ * channel instances
+ *
+ */
+public interface Network extends Processor, Service {
+
+ /**
+ * @return the name
+ */
+ public String getName() ;
+ /**
+ * @param name the name to set
+ */
+ public void setName(String name);
+
+ /**
+ * Set the uri for the <Code>Network</Code> to use
+ * @param uri
+ */
+ public void setURI(URI uri);
+
+ /**
+ * Set the uri for the <Code>Network</Code> to use for management
+ * @param uri
+ */
+ public void setManagementURI(URI uri);
+
+
+ /**
+ * Send a management packet - this may be on a different address
+ * @param packet
+ * @throws Exception
+ */
+ public void downStreamManagement(Packet packet) throws Exception;
+
+
+
+}
\ No newline at end of file
Propchange:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java?rev=734779&view=auto
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
(added)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
Thu Jan 15 10:45:30 2009
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.network;
+
+import java.net.URI;
+
+/**
+ * create a new Network instance
+ *
+ */
+public class NetworkFactory {
+
+ /**
+ * @param location
+ * @return the network associated with the URI
+ * @throws Exception
+ */
+ public static Network get(URI location) throws Exception {
+ Network result = null;
+ String scheme = location.getScheme();
+ scheme = scheme.trim();
+ if (scheme.equalsIgnoreCase("mcast") ||
scheme.equalsIgnoreCase("multicast")){
+ result = new MulticastNetwork();
+ }
+ return result;
+ }
+}
Propchange:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/package.html
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/package.html?rev=734779&view=auto
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/package.html
(added)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/package.html
Thu Jan 15 10:45:30 2009
@@ -0,0 +1,27 @@
+!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+A Transport that represents all reachable nodes.
+A <Code>Network</Code>Can be a multicast address, defined by a list or urls
+or use a central location service to determine where to locate channels
+
+</body>
+</html>
\ No newline at end of file
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java?rev=734779&r1=734778&r2=734779&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java
(original)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java
Thu Jan 15 10:45:30 2009
@@ -29,7 +29,7 @@
* Compresses PacketData
*
*/
-public class CompressionProcessor extends ChainedProcessor {
+public class CompressionProcessor extends DefaultChainedProcessor {
private int compressionLimit = 8192;
private int compressionLevel = Deflater.BEST_COMPRESSION;
private class CompressionStream extends GZIPOutputStream {
Copied:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java
(from r734428,
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java)
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java&r1=734428&r2=734779&rev=734779&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java
(original)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java
Thu Jan 15 10:45:30 2009
@@ -27,13 +27,13 @@
* Chains Processors together
*
*/
-public class ChainedProcessor extends BaseService implements Processor {
- private static final Log LOG = LogFactory.getLog(ChainedProcessor.class);
+public class DefaultChainedProcessor extends BaseService implements Processor {
+ private static final Log LOG =
LogFactory.getLog(DefaultChainedProcessor.class);
private Processor next;
private Processor prev;
protected ExceptionListener exceptionListener;
- protected ChainedProcessor() {
+ protected DefaultChainedProcessor() {
}
/**
@@ -50,17 +50,17 @@
*
*/
public void setEnd(Processor next) {
- ChainedProcessor target = this;
+ DefaultChainedProcessor target = this;
Processor n = getNext();
while (n != null) {
- if (n instanceof ChainedProcessor) {
- ChainedProcessor cn = (ChainedProcessor) n;
+ if (n instanceof DefaultChainedProcessor) {
+ DefaultChainedProcessor cn = (DefaultChainedProcessor) n;
target = cn;
n = cn.getNext();
}
}
- if (next instanceof ChainedProcessor) {
- target.setNextChain((ChainedProcessor) next);
+ if (next instanceof DefaultChainedProcessor) {
+ target.setNextChain((DefaultChainedProcessor) next);
} else {
target.next = next;
}
@@ -87,12 +87,12 @@
*
* @param p
*/
- public void setNextChain(ChainedProcessor p) {
- ChainedProcessor target = this;
+ public void setNextChain(DefaultChainedProcessor p) {
+ DefaultChainedProcessor target = this;
Processor n = getNext();
while (n != null) {
- if (n instanceof ChainedProcessor) {
- ChainedProcessor cn = (ChainedProcessor) n;
+ if (n instanceof DefaultChainedProcessor) {
+ DefaultChainedProcessor cn = (DefaultChainedProcessor) n;
target = cn;
n = cn.getNext();
}
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java?rev=734779&r1=734778&r2=734779&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java
(original)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java
Thu Jan 15 10:45:30 2009
@@ -30,7 +30,7 @@
*/
@SuppressWarnings("serial")
-public class FragmentationProcessor extends ChainedProcessor {
+public class FragmentationProcessor extends DefaultChainedProcessor {
private static final Log LOG =
LogFactory.getLog(FragmentationProcessor.class);
private int maxPacketSize = BlazeConfiguration.DEFAULT_MAX_PACKET_SIZE;
private int maxCacheSize = 16 * 1024;
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java?rev=734779&r1=734778&r2=734779&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java
(original)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java
Thu Jan 15 10:45:30 2009
@@ -17,7 +17,7 @@
package org.apache.activeblaze.impl.reliable;
import java.util.Map;
-import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
import org.apache.activeblaze.util.ObjectFinder;
import org.apache.activeblaze.util.PropertyUtil;
@@ -34,23 +34,23 @@
* @return the configured transport from its URI
* @throws Exception
*/
- public static ChainedProcessor get(String location) throws Exception {
- ChainedProcessor result = findReliable(location);
+ public static DefaultChainedProcessor get(String location) throws
Exception {
+ DefaultChainedProcessor result = findReliable(location);
configure(result, location);
return result;
}
- static void configure(ChainedProcessor transport, String location) throws
Exception {
+ static void configure(DefaultChainedProcessor transport, String location)
throws Exception {
Map<String, String> options = PropertyUtil.parseParameters(location);
PropertyUtil.setProperties(transport, options);
}
- private static ChainedProcessor findReliable(String location) throws
Exception {
+ private static DefaultChainedProcessor findReliable(String location)
throws Exception {
String scheme = PropertyUtil.stripBefore(location, '?');
if (scheme == null) {
throw new IllegalArgumentException("Reliability scheme not specified:
[" + location + "]");
}
- ChainedProcessor result = (ChainedProcessor)
OBJECT_FINDER.newInstance(scheme);
+ DefaultChainedProcessor result = (DefaultChainedProcessor)
OBJECT_FINDER.newInstance(scheme);
return result;
}
}
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java?rev=734779&r1=734778&r2=734779&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java
(original)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java
Thu Jan 15 10:45:30 2009
@@ -16,14 +16,14 @@
*/
package org.apache.activeblaze.impl.reliable.flow;
-import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
import org.apache.activeblaze.impl.processor.Packet;
/**
* Simple FlowControl
*
*/
-public class SimpleFlow extends ChainedProcessor {
+public class SimpleFlow extends DefaultChainedProcessor {
int maxWindowSize = 4 * 1024;
int windowSize = 0;
int pauseTime = 2;
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java?rev=734779&r1=734778&r2=734779&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java
(original)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java
Thu Jan 15 10:45:30 2009
@@ -16,14 +16,14 @@
*/
package org.apache.activeblaze.impl.reliable.simple;
-import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
import org.apache.activeblaze.impl.reliable.flow.SimpleFlow;
/**
* Very basic (none) reliability
*
*/
-public class SimpleReliableProcessor extends ChainedProcessor{
+public class SimpleReliableProcessor extends DefaultChainedProcessor{
private SimpleFlow simpleFlow;
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java?rev=734779&r1=734778&r2=734779&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java
(original)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java
Thu Jan 15 10:45:30 2009
@@ -17,7 +17,7 @@
package org.apache.activeblaze.impl.transport;
import java.net.SocketTimeoutException;
-import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -25,7 +25,7 @@
* Thread associated with processing
*
*/
-public abstract class ThreadChainedProcessor extends ChainedProcessor
implements Runnable {
+public abstract class ThreadChainedProcessor extends DefaultChainedProcessor
implements Runnable {
private static final Log LOG =
LogFactory.getLog(ThreadChainedProcessor.class);
private int priority=Thread.NORM_PRIORITY;
private boolean daemon;
Modified:
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java?rev=734779&r1=734778&r2=734779&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java
(original)
+++
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java
Thu Jan 15 10:45:30 2009
@@ -27,13 +27,13 @@
public class ChainedProcessorTest extends TestCase {
public void testStart() throws Exception {
final AtomicBoolean test = new AtomicBoolean();
- ChainedProcessor target = new ChainedProcessor() {
+ DefaultChainedProcessor target = new DefaultChainedProcessor() {
public boolean start() {
return test.getAndSet(true);
}
};
- ChainedProcessor A = new ChainedProcessor();
- ChainedProcessor B = new ChainedProcessor();
+ DefaultChainedProcessor A = new DefaultChainedProcessor();
+ DefaultChainedProcessor B = new DefaultChainedProcessor();
A.setNext(B);
A.setEnd(target);
A.start();
@@ -42,13 +42,13 @@
public void testStop() throws Exception {
final AtomicBoolean test = new AtomicBoolean();
- ChainedProcessor target = new ChainedProcessor() {
+ DefaultChainedProcessor target = new DefaultChainedProcessor() {
public boolean stop() {
return test.getAndSet(true);
}
};
- ChainedProcessor A = new ChainedProcessor();
- ChainedProcessor B = new ChainedProcessor();
+ DefaultChainedProcessor A = new DefaultChainedProcessor();
+ DefaultChainedProcessor B = new DefaultChainedProcessor();
A.setNext(B);
A.setEnd(target);
A.start();
@@ -58,15 +58,15 @@
public void testDownStream() throws Exception {
final AtomicBoolean test = new AtomicBoolean();
- ChainedProcessor target = new ChainedProcessor() {
+ DefaultChainedProcessor target = new DefaultChainedProcessor() {
public void downStream(Packet p) {
test.set(true);
}
};
- ChainedProcessor A = new ChainedProcessor();
- ChainedProcessor B = new ChainedProcessor();
- ChainedProcessor C = new ChainedProcessor();
- ChainedProcessor D = new ChainedProcessor();
+ DefaultChainedProcessor A = new DefaultChainedProcessor();
+ DefaultChainedProcessor B = new DefaultChainedProcessor();
+ DefaultChainedProcessor C = new DefaultChainedProcessor();
+ DefaultChainedProcessor D = new DefaultChainedProcessor();
A.setEnd(B);
A.setEnd(C);
A.setEnd(D);
@@ -79,15 +79,15 @@
public void testUpStream() throws Exception {
final AtomicBoolean test = new AtomicBoolean();
- ChainedProcessor target = new ChainedProcessor() {
+ DefaultChainedProcessor target = new DefaultChainedProcessor() {
public void upStream(Packet p) {
test.set(true);
}
};
- ChainedProcessor A = new ChainedProcessor();
- ChainedProcessor B = new ChainedProcessor();
- ChainedProcessor C = new ChainedProcessor();
- ChainedProcessor D = new ChainedProcessor();
+ DefaultChainedProcessor A = new DefaultChainedProcessor();
+ DefaultChainedProcessor B = new DefaultChainedProcessor();
+ DefaultChainedProcessor C = new DefaultChainedProcessor();
+ DefaultChainedProcessor D = new DefaultChainedProcessor();
target.setEnd(A);
A.setEnd(B);
A.setEnd(C);
Modified:
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java?rev=734779&r1=734778&r2=734779&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java
(original)
+++
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java
Thu Jan 15 10:45:30 2009
@@ -18,7 +18,7 @@
import java.util.ArrayList;
import java.util.List;
-import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
import org.apache.activeblaze.impl.processor.Packet;
@@ -26,7 +26,7 @@
* Test Processor
*
*/
-public class TerminatedChainedProcessor extends ChainedProcessor {
+public class TerminatedChainedProcessor extends DefaultChainedProcessor {
private Packet result = null;
private List<Packet> list = new ArrayList<Packet>();