Author: jbellis Date: Fri Aug 13 17:07:56 2010 New Revision: 985287 URL: http://svn.apache.org/viewvc?rev=985287&view=rev Log: merge from 0.6
Removed: cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java Modified: cassandra/trunk/ (props changed) cassandra/trunk/CHANGES.txt cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed) cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Propchange: cassandra/trunk/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Aug 13 17:07:56 2010 @@ -1,4 +1,4 @@ -/cassandra/branches/cassandra-0.6:922689-985244 +/cassandra/branches/cassandra-0.6:922689-985285 /cassandra/trunk:978791 /incubator/cassandra/branches/cassandra-0.3:774578-796573 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350 Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=985287&r1=985286&r2=985287&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Fri Aug 13 17:07:56 2010 @@ -70,6 +70,8 @@ dev initialization (CASSANDRA-1377) * fix errors in hard-coded bloom filter optKPerBucket by computing it algorithmically (CASSANDRA-1220 + * remove message deserialization stage, and uncap read/write stages + so slow reads/writes don't block gossip processing (CASSANDRA-1358) 0.6.4 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Aug 13 17:07:56 2010 @@ -1,4 +1,4 @@ -/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-985244 +/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-985285 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:978791 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Aug 13 17:07:56 2010 @@ -1,4 +1,4 @@ -/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-985244 +/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-985285 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:978791 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Aug 13 17:07:56 2010 @@ -1,4 +1,4 @@ -/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-985244 +/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-985285 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:978791 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Aug 13 17:07:56 2010 @@ -1,4 +1,4 @@ -/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-985244 +/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-985285 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:978791 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Aug 13 17:07:56 2010 @@ -1,4 +1,4 @@ -/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-985244 +/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-985285 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:978791 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350 Modified: cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=985287&r1=985286&r2=985287&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java Fri Aug 13 17:07:56 2010 @@ -74,7 +74,7 @@ public class StageManager numThreads, KEEPALIVE, TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getStageQueueSize()), + new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(name)); } @@ -86,7 +86,7 @@ public class StageManager numThreads, KEEPALIVE, TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getStageQueueSize()), + new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(name)); } Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=985287&r1=985286&r2=985287&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Fri Aug 13 17:07:56 2010 @@ -107,8 +107,6 @@ public class DatabaseDescriptor throw new ConfigurationException("Cannot locate " + STORAGE_CONF_FILE + " on the classpath"); } - private static int stageQueueSize_ = 4096; - static { try @@ -1008,11 +1006,6 @@ public class DatabaseDescriptor return getCFMetaData(tableName, cfName).subcolumnComparator; } - public static int getStageQueueSize() - { - return stageQueueSize_; - } - public static AbstractReconciler getReconciler(String tableName, String cfName) { assert tableName != null; Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=985287&r1=985286&r2=985287&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Fri Aug 13 17:07:56 2010 @@ -27,6 +27,7 @@ import java.net.Socket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.net.sink.SinkManager; import org.apache.cassandra.streaming.IncomingStreamReader; import org.apache.cassandra.streaming.StreamHeader; @@ -76,7 +77,9 @@ public class IncomingTcpConnection exten int size = input.readInt(); byte[] contentBytes = new byte[size]; input.readFully(contentBytes); - MessagingService.getDeserializationExecutor().submit(new MessageDeserializationTask(new ByteArrayInputStream(contentBytes))); + + Message message = Message.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(contentBytes))); + MessagingService.receive(message); } } catch (EOFException e) Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=985287&r1=985286&r2=985287&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java Fri Aug 13 17:07:56 2010 @@ -21,13 +21,16 @@ package org.apache.cassandra.net; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.service.StorageService; public class MessageDeliveryTask implements Runnable { + private static final Logger logger_ = LoggerFactory.getLogger(MessageDeliveryTask.class); + private Message message_; - private static Logger logger_ = LoggerFactory.getLogger(MessageDeliveryTask.class); - + private final long constructionTime_ = System.currentTimeMillis(); + public MessageDeliveryTask(Message message) { message_ = message; @@ -35,6 +38,12 @@ public class MessageDeliveryTask impleme public void run() { + if (System.currentTimeMillis() > constructionTime_ + DatabaseDescriptor.getRpcTimeout()) + { + MessagingService.incrementDroppedMessages(); + return; + } + StorageService.Verb verb = message_.getVerb(); IVerbHandler verbHandler = MessagingService.instance.getVerbHandler(verb); assert verbHandler != null : "unknown verb " + verb; Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=985287&r1=985286&r2=985287&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Fri Aug 13 17:07:56 2010 @@ -18,23 +18,6 @@ package org.apache.cassandra.net; -import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; -import org.apache.cassandra.concurrent.NamedThreadFactory; -import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.net.io.SerializerType; -import org.apache.cassandra.net.sink.SinkManager; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.streaming.StreamHeader; -import org.apache.cassandra.utils.ExpiringMap; -import org.apache.cassandra.utils.GuidGenerator; -import org.apache.cassandra.utils.SimpleCondition; -import org.cliffc.high_scale_lib.NonBlockingHashMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOError; import java.io.IOException; import java.net.InetAddress; @@ -47,13 +30,28 @@ import java.nio.channels.ServerSocketCha import java.security.MessageDigest; import java.util.EnumMap; import java.util.Map; -import java.util.TimerTask; import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.net.io.SerializerType; +import org.apache.cassandra.net.sink.SinkManager; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.streaming.StreamHeader; +import org.apache.cassandra.utils.ExpiringMap; +import org.apache.cassandra.utils.GuidGenerator; +import org.apache.cassandra.utils.SimpleCondition; +import org.cliffc.high_scale_lib.NonBlockingHashMap; + public class MessagingService { private static int version_ = 1; @@ -70,8 +68,8 @@ public class MessagingService /* Lookup table for registering message handlers based on the verb. */ private static Map<StorageService.Verb, IVerbHandler> verbHandlers_; - /* Thread pool to handle deserialization of messages read from the socket. */ - private static ExecutorService messageDeserializerExecutor_; + /* Thread pool to handle messages without a specialized stage */ + private static ExecutorService defaultExecutor_; /* Thread pool to handle messaging write activities */ private static ExecutorService streamExecutor_; @@ -106,13 +104,7 @@ public class MessagingService callbackMap_ = new ExpiringMap<String, IAsyncCallback>((long) (1.1 * DatabaseDescriptor.getRpcTimeout())); taskCompletionMap_ = new ExpiringMap<String, IAsyncResult>((long) (1.1 * DatabaseDescriptor.getRpcTimeout())); - // read executor puts messages to deserialize on this. - messageDeserializerExecutor_ = new JMXEnabledThreadPoolExecutor(1, - Runtime.getRuntime().availableProcessors(), - StageManager.KEEPALIVE, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), - new NamedThreadFactory("MESSAGE-DESERIALIZER-POOL")); + defaultExecutor_ = new JMXEnabledThreadPoolExecutor("MISCELLANEOUS-POOL"); streamExecutor_ = new JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL"); TimerTask logDropped = new TimerTask() @@ -352,8 +344,8 @@ public class MessagingService /** blocks until the processing pools are empty and done. */ public static void waitFor() throws InterruptedException { - while (!messageDeserializerExecutor_.isTerminated()) - messageDeserializerExecutor_.awaitTermination(5, TimeUnit.SECONDS); + while (!defaultExecutor_.isTerminated()) + defaultExecutor_.awaitTermination(5, TimeUnit.SECONDS); while (!streamExecutor_.isTerminated()) streamExecutor_.awaitTermination(5, TimeUnit.SECONDS); } @@ -371,7 +363,7 @@ public class MessagingService throw new IOError(e); } - messageDeserializerExecutor_.shutdownNow(); + defaultExecutor_.shutdownNow(); streamExecutor_.shutdownNow(); /* shut down the cachetables */ @@ -383,14 +375,16 @@ public class MessagingService public static void receive(Message message) { - Runnable runnable = new MessageDeliveryTask(message); + message = SinkManager.processServerMessageSink(message); + Runnable runnable = new MessageDeliveryTask(message); ExecutorService stage = StageManager.getStage(message.getMessageType()); + if (stage == null) { if (logger_.isDebugEnabled()) logger_.debug("Running " + message.getMessageType() + " on default stage"); - messageDeserializerExecutor_.execute(runnable); + defaultExecutor_.execute(runnable); } else { @@ -423,11 +417,6 @@ public class MessagingService return taskCompletionMap_.getAge(key); } - public static ExecutorService getDeserializationExecutor() - { - return messageDeserializerExecutor_; - } - public static void validateMagic(int magic) throws IOException { if (magic != PROTOCOL_MAGIC)