Author: jbellis
Date: Fri Aug 13 17:07:56 2010
New Revision: 985287

URL: http://svn.apache.org/viewvc?rev=985287&view=rev
Log:
merge from 0.6

Removed:
    
cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Aug 13 17:07:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6:922689-985244
+/cassandra/branches/cassandra-0.6:922689-985285
 /cassandra/trunk:978791
 /incubator/cassandra/branches/cassandra-0.3:774578-796573
 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=985287&r1=985286&r2=985287&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Aug 13 17:07:56 2010
@@ -70,6 +70,8 @@ dev
    initialization (CASSANDRA-1377)
  * fix errors in hard-coded bloom filter optKPerBucket by computing it
    algorithmically (CASSANDRA-1220
+ * remove message deserialization stage, and uncap read/write stages
+   so slow reads/writes don't block gossip processing (CASSANDRA-1358)
 
 
 0.6.4

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Aug 13 17:07:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-985244
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-985285
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:978791
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Aug 13 17:07:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-985244
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-985285
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:978791
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Aug 13 17:07:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-985244
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-985285
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:978791
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Aug 13 17:07:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-985244
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-985285
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:978791
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Aug 13 17:07:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-985244
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-985285
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:978791
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=985287&r1=985286&r2=985287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java 
Fri Aug 13 17:07:56 2010
@@ -74,7 +74,7 @@ public class StageManager
                                                 numThreads,
                                                 KEEPALIVE,
                                                 TimeUnit.SECONDS,
-                                                new 
LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getStageQueueSize()),
+                                                new 
LinkedBlockingQueue<Runnable>(),
                                                 new NamedThreadFactory(name));
     }
     
@@ -86,7 +86,7 @@ public class StageManager
                                                      numThreads,
                                                      KEEPALIVE,
                                                      TimeUnit.SECONDS,
-                                                     new 
LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getStageQueueSize()),
+                                                     new 
LinkedBlockingQueue<Runnable>(),
                                                      new 
NamedThreadFactory(name));
     }
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=985287&r1=985286&r2=985287&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
Fri Aug 13 17:07:56 2010
@@ -107,8 +107,6 @@ public class DatabaseDescriptor
         throw new ConfigurationException("Cannot locate " + STORAGE_CONF_FILE 
+ " on the classpath");
     }
 
-    private static int stageQueueSize_ = 4096;
-
     static
     {
         try
@@ -1008,11 +1006,6 @@ public class DatabaseDescriptor
         return getCFMetaData(tableName, cfName).subcolumnComparator;
     }
 
-    public static int getStageQueueSize()
-    {
-        return stageQueueSize_;
-    }
-
     public static AbstractReconciler getReconciler(String tableName, String 
cfName)
     {
         assert tableName != null;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=985287&r1=985286&r2=985287&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java 
Fri Aug 13 17:07:56 2010
@@ -27,6 +27,7 @@ import java.net.Socket;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.net.sink.SinkManager;
 import org.apache.cassandra.streaming.IncomingStreamReader;
 import org.apache.cassandra.streaming.StreamHeader;
 
@@ -76,7 +77,9 @@ public class IncomingTcpConnection exten
                     int size = input.readInt();
                     byte[] contentBytes = new byte[size];
                     input.readFully(contentBytes);
-                    MessagingService.getDeserializationExecutor().submit(new 
MessageDeserializationTask(new ByteArrayInputStream(contentBytes)));
+                    
+                    Message message = Message.serializer().deserialize(new 
DataInputStream(new ByteArrayInputStream(contentBytes)));
+                    MessagingService.receive(message);
                 }
             }
             catch (EOFException e)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=985287&r1=985286&r2=985287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java 
Fri Aug 13 17:07:56 2010
@@ -21,13 +21,16 @@ package org.apache.cassandra.net;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.service.StorageService;
 
 public class MessageDeliveryTask implements Runnable
 {
+    private static final Logger logger_ = 
LoggerFactory.getLogger(MessageDeliveryTask.class);    
+
     private Message message_;
-    private static Logger logger_ = 
LoggerFactory.getLogger(MessageDeliveryTask.class);    
-    
+    private final long constructionTime_ = System.currentTimeMillis();
+
     public MessageDeliveryTask(Message message)
     {
         message_ = message;    
@@ -35,6 +38,12 @@ public class MessageDeliveryTask impleme
     
     public void run()
     { 
+        if (System.currentTimeMillis() >  constructionTime_ + 
DatabaseDescriptor.getRpcTimeout())
+        {
+            MessagingService.incrementDroppedMessages();
+            return;
+        }
+
         StorageService.Verb verb = message_.getVerb();
         IVerbHandler verbHandler = 
MessagingService.instance.getVerbHandler(verb);
         assert verbHandler != null : "unknown verb " + verb;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=985287&r1=985286&r2=985287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Fri 
Aug 13 17:07:56 2010
@@ -18,23 +18,6 @@
 
 package org.apache.cassandra.net;
 
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.net.io.SerializerType;
-import org.apache.cassandra.net.sink.SinkManager;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.StreamHeader;
-import org.apache.cassandra.utils.ExpiringMap;
-import org.apache.cassandra.utils.GuidGenerator;
-import org.apache.cassandra.utils.SimpleCondition;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOError;
 import java.io.IOException;
 import java.net.InetAddress;
@@ -47,13 +30,28 @@ import java.nio.channels.ServerSocketCha
 import java.security.MessageDigest;
 import java.util.EnumMap;
 import java.util.Map;
-import java.util.TimerTask;
 import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.io.SerializerType;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.StreamHeader;
+import org.apache.cassandra.utils.ExpiringMap;
+import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.SimpleCondition;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
 public class MessagingService
 {
     private static int version_ = 1;
@@ -70,8 +68,8 @@ public class MessagingService
     /* Lookup table for registering message handlers based on the verb. */
     private static Map<StorageService.Verb, IVerbHandler> verbHandlers_;
 
-    /* Thread pool to handle deserialization of messages read from the socket. 
*/
-    private static ExecutorService messageDeserializerExecutor_;
+    /* Thread pool to handle messages without a specialized stage */
+    private static ExecutorService defaultExecutor_;
     
     /* Thread pool to handle messaging write activities */
     private static ExecutorService streamExecutor_;
@@ -106,13 +104,7 @@ public class MessagingService
         callbackMap_ = new ExpiringMap<String, IAsyncCallback>((long) (1.1 * 
DatabaseDescriptor.getRpcTimeout()));
         taskCompletionMap_ = new ExpiringMap<String, IAsyncResult>((long) (1.1 
* DatabaseDescriptor.getRpcTimeout()));
 
-        // read executor puts messages to deserialize on this.
-        messageDeserializerExecutor_ = new JMXEnabledThreadPoolExecutor(1,
-                                                                        
Runtime.getRuntime().availableProcessors(),
-                                                                        
StageManager.KEEPALIVE,
-                                                                        
TimeUnit.SECONDS,
-                                                                        new 
LinkedBlockingQueue<Runnable>(),
-                                                                        new 
NamedThreadFactory("MESSAGE-DESERIALIZER-POOL"));
+        defaultExecutor_ = new 
JMXEnabledThreadPoolExecutor("MISCELLANEOUS-POOL");
 
         streamExecutor_ = new 
JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
         TimerTask logDropped = new TimerTask()
@@ -352,8 +344,8 @@ public class MessagingService
     /** blocks until the processing pools are empty and done. */
     public static void waitFor() throws InterruptedException
     {
-        while (!messageDeserializerExecutor_.isTerminated())
-            messageDeserializerExecutor_.awaitTermination(5, TimeUnit.SECONDS);
+        while (!defaultExecutor_.isTerminated())
+            defaultExecutor_.awaitTermination(5, TimeUnit.SECONDS);
         while (!streamExecutor_.isTerminated())
             streamExecutor_.awaitTermination(5, TimeUnit.SECONDS);
     }
@@ -371,7 +363,7 @@ public class MessagingService
             throw new IOError(e);
         }
 
-        messageDeserializerExecutor_.shutdownNow();
+        defaultExecutor_.shutdownNow();
         streamExecutor_.shutdownNow();
 
         /* shut down the cachetables */
@@ -383,14 +375,16 @@ public class MessagingService
 
     public static void receive(Message message)
     {
-        Runnable runnable = new MessageDeliveryTask(message);
+        message = SinkManager.processServerMessageSink(message);
 
+        Runnable runnable = new MessageDeliveryTask(message);
         ExecutorService stage = 
StageManager.getStage(message.getMessageType());
+
         if (stage == null)
         {
             if (logger_.isDebugEnabled())
                 logger_.debug("Running " + message.getMessageType() + " on 
default stage");
-            messageDeserializerExecutor_.execute(runnable);
+            defaultExecutor_.execute(runnable);
         }
         else
         {
@@ -423,11 +417,6 @@ public class MessagingService
         return taskCompletionMap_.getAge(key);
     }
 
-    public static ExecutorService getDeserializationExecutor()
-    {
-        return messageDeserializerExecutor_;
-    }
-
     public static void validateMagic(int magic) throws IOException
     {
         if (magic != PROTOCOL_MAGIC)


Reply via email to