Author: jbellis Date: Mon Nov 29 22:28:25 2010 New Revision: 1040325 URL: http://svn.apache.org/viewvc?rev=1040325&view=rev Log: clean up SinkManager patch by jbellis
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/SinkManager.java cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=1040325&r1=1040324&r2=1040325&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java Mon Nov 29 22:28:25 2010 @@ -33,6 +33,7 @@ public class MessageDeliveryTask impleme public MessageDeliveryTask(Message message) { + assert message != null; message_ = message; } Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1040325&r1=1040324&r2=1040325&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java Mon Nov 29 22:28:25 2010 @@ -305,7 +305,7 @@ public class MessagingService implements } // message sinks are a testing hook - Message processedMessage = SinkManager.processClientMessageSink(message, to); + Message processedMessage = SinkManager.processClientMessage(message, to); if (processedMessage == null) { return; @@ -385,11 +385,13 @@ public class MessagingService implements public static void receive(Message message) { - message = SinkManager.processServerMessageSink(message, null); + message = SinkManager.processServerMessage(message); + if (message == null) + return; Runnable runnable = new MessageDeliveryTask(message); ExecutorService stage = StageManager.getStage(message.getMessageType()); - assert stage != null; + assert stage != null : "No stage for message type " + message.getMessageType(); stage.execute(runnable); } Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/SinkManager.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/SinkManager.java?rev=1040325&r1=1040324&r2=1040325&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/SinkManager.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/SinkManager.java Mon Nov 29 22:28:25 2010 @@ -18,56 +18,50 @@ package org.apache.cassandra.net.sink; -import java.util.*; -import java.io.IOException; - import java.net.InetAddress; +import java.util.ArrayList; +import java.util.List; + import org.apache.cassandra.net.Message; public class SinkManager { - private static LinkedList<IMessageSink> messageSinks_ = new LinkedList<IMessageSink>(); + private static List<IMessageSink> sinks = new ArrayList<IMessageSink>(); - public static boolean isInitialized() + public static void addSink(IMessageSink ms) { - return ( messageSinks_.size() > 0 ); + sinks.add(ms); } - public static void addMessageSink(IMessageSink ms) + public static void clear() { - messageSinks_.addLast(ms); - } - - public static void clearSinks(){ - messageSinks_.clear(); + sinks.clear(); } - public static Message processClientMessageSink(Message message, InetAddress to) + public static Message processClientMessage(Message message, InetAddress to) { - ListIterator<IMessageSink> li = messageSinks_.listIterator(); - while ( li.hasNext() ) + if (sinks.isEmpty()) + return message; + + for (IMessageSink ms : sinks) { - IMessageSink ms = li.next(); message = ms.handleMessage(message, to); - if ( message == null ) - { + if (message == null) return null; - } } return message; } - public static Message processServerMessageSink(Message message, InetAddress to) + public static Message processServerMessage(Message message) { - ListIterator<IMessageSink> li = messageSinks_.listIterator(messageSinks_.size()); - while ( li.hasPrevious() ) + if (sinks.isEmpty()) + return message; + + for (IMessageSink ms : sinks) { - IMessageSink ms = li.previous(); - message = ms.handleMessage(message, to); - if ( message == null ) - { + message = ms.handleMessage(message, null); + if (message == null) return null; - } } return message; } Modified: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java?rev=1040325&r1=1040324&r2=1040325&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java (original) +++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java Mon Nov 29 22:28:25 2010 @@ -83,7 +83,7 @@ public class RemoveTest extends CleanupH @After public void tearDown() { - SinkManager.clearSinks(); + SinkManager.clear(); MessagingService.shutdown(); ss.setPartitionerUnsafe(oldPartitioner); } @@ -111,7 +111,7 @@ public class RemoveTest extends CleanupH final String token = partitioner.getTokenFactory().toString(endpointTokens.get(5)); ReplicationSink rSink = new ReplicationSink(); - SinkManager.addMessageSink(rSink); + SinkManager.addSink(rSink); // start removal in background and send replication confirmations final AtomicBoolean success = new AtomicBoolean(false); @@ -159,8 +159,8 @@ public class RemoveTest extends CleanupH NotificationSink nSink = new NotificationSink(); ReplicationSink rSink = new ReplicationSink(); - SinkManager.addMessageSink(nSink); - SinkManager.addMessageSink(rSink); + SinkManager.addSink(nSink); + SinkManager.addSink(rSink); assertEquals(0, tmd.getLeavingEndpoints().size());