Avoid deadlock in migration stage

patch by Richard Low; reviewed by slebresne for CASSANDRA-3882


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2247054a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2247054a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2247054a

Branch: refs/heads/trunk
Commit: 2247054af87f995b530f688ca9e0952e745a2cf1
Parents: a5e0994
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Fri Jun 1 12:52:45 2012 +0200
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Fri Jun 1 12:52:45 2012 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../apache/cassandra/service/MigrationManager.java |   49 ++++++++------
 2 files changed, 29 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2247054a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index be970a5..b9f2ad1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -67,6 +67,7 @@
  * Try to stop all compaction upon Keyspace or ColumnFamily drop 
(CASSANDRA-4221)
  * (cql3) Allow keyspace properties to contain hyphens (CASSANDRA-4278)
  * (cql3) Correctly validate keyspace access in create table (CASSANDRA-4296)
+ * Avoid deadlock in migration stage (CASSANDRA-3882)
 Merged from 1.0:
  * Fix super columns bug where cache is not updated (CASSANDRA-4190)
  * fix maxTimestamp to include row tombstones (CASSANDRA-4116)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2247054a/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java 
b/src/java/org/apache/cassandra/service/MigrationManager.java
index 93030cc..18ef298 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -31,8 +31,6 @@ import java.util.concurrent.Future;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +39,6 @@ import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
@@ -50,7 +47,7 @@ import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.io.util.FastByteArrayOutputStream;
-import org.apache.cassandra.net.IAsyncResult;
+import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -398,29 +395,39 @@ public class MigrationManager implements 
IEndpointStateChangeSubscriber
                                           ArrayUtils.EMPTY_BYTE_ARRAY,
                                           
Gossiper.instance.getVersion(endpoint));
 
-            int retries = 0;
-            while (retries < MIGRATION_REQUEST_RETRIES)
+            if (!FailureDetector.instance.isAlive(endpoint))
             {
-                if (!FailureDetector.instance.isAlive(endpoint))
-                {
-                    logger.error("Can't send migration request: node {} is 
down.", endpoint);
-                    return;
-                }
-
-                IAsyncResult iar = MessagingService.instance().sendRR(message, 
endpoint);
+                logger.error("Can't send migration request: node {} is down.", 
endpoint);
+                return;
+            }
 
-                try
+            IAsyncCallback cb = new IAsyncCallback()
+            {
+                @Override
+                public void response(Message message)
                 {
-                    byte[] reply = iar.get(DatabaseDescriptor.getRpcTimeout(), 
TimeUnit.MILLISECONDS);
-
-                    DefsTable.mergeRemoteSchema(reply, message.getVersion());
-                    return;
+                    try
+                    {
+                        DefsTable.mergeRemoteSchema(message.getMessageBody(), 
message.getVersion());
+                    }
+                    catch (IOException e)
+                    {
+                        logger.error("IOException merging remote schema", e);
+                    }
+                    catch (ConfigurationException e)
+                    {
+                        logger.error("Configuration exception merging remote 
schema", e);
+                    }
                 }
-                catch(TimeoutException e)
+
+                @Override
+                public boolean isLatencyForSnitch()
                 {
-                    retries++;
+                    return false;
                 }
-            }
+            };
+
+            MessagingService.instance().sendRR(message, endpoint, cb);
         }
     }
 }

Reply via email to