Author: jbellis Date: Fri Aug 13 16:58:49 2010 New Revision: 985283 URL: http://svn.apache.org/viewvc?rev=985283&view=rev Log: remove message deserialization stage, and uncap read/write stages. patch by jbellis; reviewed by Stu Hood for CASSANDRA-1358
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeliveryTask.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java Modified: cassandra/branches/cassandra-0.6/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=985283&r1=985282&r2=985283&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.6/CHANGES.txt Fri Aug 13 16:58:49 2010 @@ -11,6 +11,8 @@ initialization (CASSANDRA-1377) * fix errors in hard-coded bloom filter optKPerBucket by computing it algorithmically (CASSANDRA-1220 + * remove message deserialization stage, and uncap read/write stages + so slow reads/writes don't block gossip processing (CASSANDRA-1358) 0.6.4 Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=985283&r1=985282&r2=985283&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java Fri Aug 13 16:58:49 2010 @@ -70,7 +70,7 @@ public class StageManager numThreads, Integer.MAX_VALUE, TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getStageQueueSize()), + new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(name)); } @@ -82,7 +82,7 @@ public class StageManager numThreads, Integer.MAX_VALUE, TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getStageQueueSize()), + new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(name)); } Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=985283&r1=985282&r2=985283&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Fri Aug 13 16:58:49 2010 @@ -156,8 +156,6 @@ public class DatabaseDescriptor throw new RuntimeException("Cannot locate " + STORAGE_CONF_FILE + " via storage-config system property or classpath lookup."); } - private static int stageQueueSize_ = 4096; - static { try @@ -1114,11 +1112,6 @@ public class DatabaseDescriptor return getCFMetaData(tableName, cfName).subcolumnComparator; } - public static int getStageQueueSize() - { - return stageQueueSize_; - } - /** * @return The absolute number of keys that should be cached per table. */ Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=985283&r1=985282&r2=985283&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Fri Aug 13 16:58:49 2010 @@ -26,6 +26,7 @@ import java.net.Socket; import org.apache.log4j.Logger; +import org.apache.cassandra.net.sink.SinkManager; import org.apache.cassandra.streaming.IncomingStreamReader; public class IncomingTcpConnection extends Thread @@ -70,7 +71,9 @@ public class IncomingTcpConnection exten int size = input.readInt(); byte[] contentBytes = new byte[size]; input.readFully(contentBytes); - MessagingService.getDeserializationExecutor().submit(new MessageDeserializationTask(new ByteArrayInputStream(contentBytes))); + + Message message = Message.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(contentBytes))); + MessagingService.receive(message); } } catch (EOFException e) Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeliveryTask.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=985283&r1=985282&r2=985283&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeliveryTask.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeliveryTask.java Fri Aug 13 16:58:49 2010 @@ -20,13 +20,16 @@ package org.apache.cassandra.net; import org.apache.log4j.Logger; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.service.StorageService; public class MessageDeliveryTask implements Runnable { + private static final Logger logger_ = Logger.getLogger(MessageDeliveryTask.class); + private Message message_; - private static Logger logger_ = Logger.getLogger(MessageDeliveryTask.class); - + private final long constructionTime_ = System.currentTimeMillis(); + public MessageDeliveryTask(Message message) { message_ = message; @@ -34,6 +37,12 @@ public class MessageDeliveryTask impleme public void run() { + if (System.currentTimeMillis() > constructionTime_ + DatabaseDescriptor.getRpcTimeout()) + { + MessagingService.incrementDroppedMessages(); + return; + } + StorageService.Verb verb = message_.getVerb(); IVerbHandler verbHandler = MessagingService.instance.getVerbHandler(verb); assert verbHandler != null : "unknown verb " + verb; Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java?rev=985283&r1=985282&r2=985283&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java Fri Aug 13 16:58:49 2010 @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.net; - -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.IOException; - -import org.apache.log4j.Logger; - -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.net.sink.SinkManager; -import org.apache.cassandra.utils.WrappedRunnable; - -class MessageDeserializationTask extends WrappedRunnable -{ - private static Logger logger = Logger.getLogger(MessageDeserializationTask.class); - - private final ByteArrayInputStream bytes; - private final long constructionTime = System.currentTimeMillis(); - - MessageDeserializationTask(ByteArrayInputStream bytes) - { - this.bytes = bytes; - } - - public void runMayThrow() throws IOException - { - if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout()) - { - MessagingService.incrementDroppedMessages(); - return; - } - - Message message = Message.serializer().deserialize(new DataInputStream(bytes)); - message = SinkManager.processServerMessageSink(message); - MessagingService.receive(message); - } -} Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java?rev=985283&r1=985282&r2=985283&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java Fri Aug 13 16:58:49 2010 @@ -18,21 +18,6 @@ package org.apache.cassandra.net; -import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; -import org.apache.cassandra.concurrent.NamedThreadFactory; -import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.gms.IFailureDetectionEventListener; -import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.net.io.SerializerType; -import org.apache.cassandra.net.sink.SinkManager; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.ExpiringMap; -import org.apache.cassandra.utils.GuidGenerator; -import org.apache.cassandra.utils.SimpleCondition; -import org.apache.log4j.Logger; -import org.cliffc.high_scale_lib.NonBlockingHashMap; - import java.io.IOError; import java.io.IOException; import java.net.InetAddress; @@ -47,12 +32,24 @@ import java.util.HashMap; import java.util.Map; import java.util.Timer; import java.util.TimerTask; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.log4j.Logger; + +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.net.io.SerializerType; +import org.apache.cassandra.net.sink.SinkManager; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.ExpiringMap; +import org.apache.cassandra.utils.GuidGenerator; +import org.apache.cassandra.utils.SimpleCondition; +import org.cliffc.high_scale_lib.NonBlockingHashMap; + public class MessagingService { private static int version_ = 1; @@ -69,8 +66,8 @@ public class MessagingService /* Lookup table for registering message handlers based on the verb. */ private static Map<StorageService.Verb, IVerbHandler> verbHandlers_; - /* Thread pool to handle deserialization of messages read from the socket. */ - private static ExecutorService messageDeserializerExecutor_; + /* Thread pool to handle messages without a specialized stage */ + private static ExecutorService defaultExecutor_; /* Thread pool to handle messaging write activities */ private static ExecutorService streamExecutor_; @@ -105,13 +102,7 @@ public class MessagingService callbackMap_ = new ExpiringMap<String, IAsyncCallback>( 2 * DatabaseDescriptor.getRpcTimeout() ); taskCompletionMap_ = new ExpiringMap<String, IAsyncResult>( 2 * DatabaseDescriptor.getRpcTimeout() ); - // read executor puts messages to deserialize on this. - messageDeserializerExecutor_ = new JMXEnabledThreadPoolExecutor(1, - Runtime.getRuntime().availableProcessors(), - Integer.MAX_VALUE, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), - new NamedThreadFactory("MESSAGE-DESERIALIZER-POOL")); + defaultExecutor_ = new JMXEnabledThreadPoolExecutor("MISCELLANEOUS-POOL"); streamExecutor_ = new JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL"); TimerTask logDropped = new TimerTask() @@ -354,8 +345,8 @@ public class MessagingService /** blocks until the processing pools are empty and done. */ public static void waitFor() throws InterruptedException { - while (!messageDeserializerExecutor_.isTerminated()) - messageDeserializerExecutor_.awaitTermination(5, TimeUnit.SECONDS); + while (!defaultExecutor_.isTerminated()) + defaultExecutor_.awaitTermination(5, TimeUnit.SECONDS); while (!streamExecutor_.isTerminated()) streamExecutor_.awaitTermination(5, TimeUnit.SECONDS); } @@ -373,7 +364,7 @@ public class MessagingService throw new IOError(e); } - messageDeserializerExecutor_.shutdownNow(); + defaultExecutor_.shutdownNow(); streamExecutor_.shutdownNow(); /* shut down the cachetables */ @@ -385,14 +376,16 @@ public class MessagingService public static void receive(Message message) { - Runnable runnable = new MessageDeliveryTask(message); + message = SinkManager.processServerMessageSink(message); + Runnable runnable = new MessageDeliveryTask(message); ExecutorService stage = StageManager.getStage(message.getMessageType()); + if (stage == null) { if (logger_.isDebugEnabled()) logger_.debug("Running " + message.getMessageType() + " on default stage"); - messageDeserializerExecutor_.execute(runnable); + defaultExecutor_.execute(runnable); } else { @@ -425,11 +418,6 @@ public class MessagingService return taskCompletionMap_.getAge(key); } - public static ExecutorService getDeserializationExecutor() - { - return messageDeserializerExecutor_; - } - public static void validateMagic(int magic) throws IOException { if (magic != PROTOCOL_MAGIC)