Author: jbellis
Date: Mon Nov 29 22:28:25 2010
New Revision: 1040325

URL: http://svn.apache.org/viewvc?rev=1040325&view=rev
Log:
clean up SinkManager
patch by jbellis

Modified:
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/SinkManager.java
    
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=1040325&r1=1040324&r2=1040325&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
 Mon Nov 29 22:28:25 2010
@@ -33,6 +33,7 @@ public class MessageDeliveryTask impleme
 
     public MessageDeliveryTask(Message message)
     {
+        assert message != null;
         message_ = message;    
     }
     

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1040325&r1=1040324&r2=1040325&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
 Mon Nov 29 22:28:25 2010
@@ -305,7 +305,7 @@ public class MessagingService implements
         }
 
         // message sinks are a testing hook
-        Message processedMessage = 
SinkManager.processClientMessageSink(message, to);
+        Message processedMessage = SinkManager.processClientMessage(message, 
to);
         if (processedMessage == null)
         {
             return;
@@ -385,11 +385,13 @@ public class MessagingService implements
 
     public static void receive(Message message)
     {
-        message = SinkManager.processServerMessageSink(message, null);
+        message = SinkManager.processServerMessage(message);
+        if (message == null)
+            return;
 
         Runnable runnable = new MessageDeliveryTask(message);
         ExecutorService stage = 
StageManager.getStage(message.getMessageType());
-        assert stage != null;
+        assert stage != null : "No stage for message type " + 
message.getMessageType();
         stage.execute(runnable);
     }
 

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/SinkManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/SinkManager.java?rev=1040325&r1=1040324&r2=1040325&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/SinkManager.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/SinkManager.java
 Mon Nov 29 22:28:25 2010
@@ -18,56 +18,50 @@
 
 package org.apache.cassandra.net.sink;
 
-import java.util.*;
-import java.io.IOException;
-
 import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.cassandra.net.Message;
 
 public class SinkManager
 {
-    private static LinkedList<IMessageSink> messageSinks_ = new 
LinkedList<IMessageSink>();
+    private static List<IMessageSink> sinks = new ArrayList<IMessageSink>();
 
-    public static boolean isInitialized()
+    public static void addSink(IMessageSink ms)
     {
-        return ( messageSinks_.size() > 0 );
+        sinks.add(ms);
     }
 
-    public static void addMessageSink(IMessageSink ms)
+    public static void clear()
     {
-        messageSinks_.addLast(ms);
-    }
-    
-    public static void clearSinks(){
-        messageSinks_.clear();
+        sinks.clear();
     }
 
-    public static Message processClientMessageSink(Message message, 
InetAddress to)
+    public static Message processClientMessage(Message message, InetAddress to)
     {
-        ListIterator<IMessageSink> li = messageSinks_.listIterator();
-        while ( li.hasNext() )
+        if (sinks.isEmpty())
+            return message;
+
+        for (IMessageSink ms : sinks)
         {
-            IMessageSink ms = li.next();
             message = ms.handleMessage(message, to);
-            if ( message == null )
-            {
+            if (message == null)
                 return null;
-            }
         }
         return message;
     }
 
-    public static Message processServerMessageSink(Message message, 
InetAddress to)
+    public static Message processServerMessage(Message message)
     {
-        ListIterator<IMessageSink> li = 
messageSinks_.listIterator(messageSinks_.size());
-        while ( li.hasPrevious() )
+        if (sinks.isEmpty())
+            return message;
+
+        for (IMessageSink ms : sinks)
         {
-            IMessageSink ms = li.previous();
-            message = ms.handleMessage(message, to);
-            if ( message == null )
-            {
+            message = ms.handleMessage(message, null);
+            if (message == null)
                 return null;
-            }
         }
         return message;
     }

Modified: 
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java?rev=1040325&r1=1040324&r2=1040325&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java
 Mon Nov 29 22:28:25 2010
@@ -83,7 +83,7 @@ public class RemoveTest extends CleanupH
     @After
     public void tearDown()
     {
-        SinkManager.clearSinks();
+        SinkManager.clear();
         MessagingService.shutdown();
         ss.setPartitionerUnsafe(oldPartitioner);
     }
@@ -111,7 +111,7 @@ public class RemoveTest extends CleanupH
 
         final String token = 
partitioner.getTokenFactory().toString(endpointTokens.get(5));
         ReplicationSink rSink = new ReplicationSink();
-        SinkManager.addMessageSink(rSink);
+        SinkManager.addSink(rSink);
 
         // start removal in background and send replication confirmations
         final AtomicBoolean success = new AtomicBoolean(false);
@@ -159,8 +159,8 @@ public class RemoveTest extends CleanupH
 
         NotificationSink nSink = new NotificationSink();
         ReplicationSink rSink = new ReplicationSink();
-        SinkManager.addMessageSink(nSink);
-        SinkManager.addMessageSink(rSink);
+        SinkManager.addSink(nSink);
+        SinkManager.addSink(rSink);
 
         assertEquals(0, tmd.getLeavingEndpoints().size());
 


Reply via email to