Repository: cassandra
Updated Branches:
  refs/heads/trunk 8ef71f3f2 -> 5ea2c6b8e


Optimize Paxos prepare and propose stage for local requests


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5ea2c6b8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5ea2c6b8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5ea2c6b8

Branch: refs/heads/trunk
Commit: 5ea2c6b8e963a061e41f2dedec7955a21860f54a
Parents: 8ef71f3
Author: jaydeepkumar1984 <chovatia.jayd...@gmail.com>
Authored: Mon Sep 11 22:05:54 2017 -0700
Committer: Ariel Weisberg <aweisb...@apple.com>
Committed: Thu Sep 28 16:50:32 2017 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/service/StorageProxy.java  | 62 +++++++++++++++++++-
 .../service/paxos/PrepareVerbHandler.java       |  8 ++-
 .../service/paxos/ProposeVerbHandler.java       |  8 ++-
 .../cassandra/service/PaxosStateTest.java       | 24 ++++++++
 5 files changed, 96 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ea2c6b8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 081ed72..6d8d67e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Optimize Paxos prepare and propose stage for local requests 
(CASSANDRA-13862)
  * Throttle base partitions during MV repair streaming to prevent OOM 
(CASSANDRA-13299)
  * Use compaction threshold for STCS in L0 (CASSANDRA-13861)
  * Fix problem with min_compress_ratio: 1 and disallow ratio < 1 
(CASSANDRA-13703)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ea2c6b8/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index df39530..49bb4b6 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -68,7 +68,10 @@ import org.apache.cassandra.net.*;
 import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.service.paxos.PaxosState;
 import org.apache.cassandra.service.paxos.PrepareCallback;
+import org.apache.cassandra.service.paxos.PrepareResponse;
+import org.apache.cassandra.service.paxos.PrepareVerbHandler;
 import org.apache.cassandra.service.paxos.ProposeCallback;
+import org.apache.cassandra.service.paxos.ProposeVerbHandler;
 import org.apache.cassandra.net.MessagingService.Verb;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.triggers.TriggerExecutor;
@@ -501,7 +504,34 @@ public class StorageProxy implements StorageProxyMBean
         PrepareCallback callback = new 
PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), 
requiredParticipants, consistencyForPaxos, queryStartNanoTime);
         MessageOut<Commit> message = new 
MessageOut<Commit>(MessagingService.Verb.PAXOS_PREPARE, toPrepare, 
Commit.serializer);
         for (InetAddress target : endpoints)
-            MessagingService.instance().sendRR(message, target, callback);
+        {
+            if (canDoLocalRequest(target))
+            {
+                
StageManager.getStage(MessagingService.verbStages.get(MessagingService.Verb.PAXOS_PREPARE)).execute(new
 Runnable()
+                {
+                    public void run()
+                    {
+                        try
+                        {
+                            MessageIn<PrepareResponse> message = 
MessageIn.create(FBUtilities.getBroadcastAddress(),
+                                    PrepareVerbHandler.doPrepare(toPrepare),
+                                    Collections.<String, byte[]>emptyMap(),
+                                    MessagingService.Verb.INTERNAL_RESPONSE,
+                                    MessagingService.current_version);
+                            callback.response(message);
+                        }
+                        catch (Exception ex)
+                        {
+                            logger.error("Failed paxos prepare locally : {}", 
ex);
+                        }
+                    }
+                });
+            }
+            else
+            {
+                MessagingService.instance().sendRR(message, target, callback);
+            }
+        }
         callback.await();
         return callback;
     }
@@ -512,8 +542,34 @@ public class StorageProxy implements StorageProxyMBean
         ProposeCallback callback = new ProposeCallback(endpoints.size(), 
requiredParticipants, !timeoutIfPartial, consistencyLevel, queryStartNanoTime);
         MessageOut<Commit> message = new 
MessageOut<Commit>(MessagingService.Verb.PAXOS_PROPOSE, proposal, 
Commit.serializer);
         for (InetAddress target : endpoints)
-            MessagingService.instance().sendRR(message, target, callback);
-
+        {
+            if (canDoLocalRequest(target))
+            {
+                
StageManager.getStage(MessagingService.verbStages.get(MessagingService.Verb.PAXOS_PROPOSE)).execute(new
 Runnable()
+                {
+                    public void run()
+                    {
+                        try
+                        {
+                            MessageIn<Boolean> message = 
MessageIn.create(FBUtilities.getBroadcastAddress(),
+                                    ProposeVerbHandler.doPropose(proposal),
+                                    Collections.<String, byte[]>emptyMap(),
+                                    MessagingService.Verb.INTERNAL_RESPONSE,
+                                    MessagingService.current_version);
+                            callback.response(message);
+                        }
+                        catch (Exception ex)
+                        {
+                            logger.error("Failed paxos propose locally : {}", 
ex);
+                        }
+                    }
+                });
+            }
+            else
+            {
+                MessagingService.instance().sendRR(message, target, callback);
+            }
+        }
         callback.await();
 
         if (callback.isSuccessful())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ea2c6b8/src/java/org/apache/cassandra/service/paxos/PrepareVerbHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/paxos/PrepareVerbHandler.java 
b/src/java/org/apache/cassandra/service/paxos/PrepareVerbHandler.java
index 50a537f..2750b76 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareVerbHandler.java
@@ -28,10 +28,14 @@ import org.apache.cassandra.net.MessagingService;
 
 public class PrepareVerbHandler implements IVerbHandler<Commit>
 {
+    public static PrepareResponse doPrepare(Commit toPrepare)
+    {
+        return PaxosState.prepare(toPrepare);
+    }
+
     public void doVerb(MessageIn<Commit> message, int id)
     {
-        PrepareResponse response = PaxosState.prepare(message.payload);
-        MessageOut<PrepareResponse> reply = new 
MessageOut<PrepareResponse>(MessagingService.Verb.REQUEST_RESPONSE, response, 
PrepareResponse.serializer);
+        MessageOut<PrepareResponse> reply = new 
MessageOut<PrepareResponse>(MessagingService.Verb.REQUEST_RESPONSE, 
doPrepare(message.payload), PrepareResponse.serializer);
         MessagingService.instance().sendReply(reply, id, message.from);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ea2c6b8/src/java/org/apache/cassandra/service/paxos/ProposeVerbHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/paxos/ProposeVerbHandler.java 
b/src/java/org/apache/cassandra/service/paxos/ProposeVerbHandler.java
index 536ff5a..81c9017 100644
--- a/src/java/org/apache/cassandra/service/paxos/ProposeVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/paxos/ProposeVerbHandler.java
@@ -29,10 +29,14 @@ import org.apache.cassandra.utils.BooleanSerializer;
 
 public class ProposeVerbHandler implements IVerbHandler<Commit>
 {
+    public static Boolean doPropose(Commit proposal)
+    {
+        return PaxosState.propose(proposal);
+    }
+
     public void doVerb(MessageIn<Commit> message, int id)
     {
-        Boolean response = PaxosState.propose(message.payload);
-        MessageOut<Boolean> reply = new 
MessageOut<Boolean>(MessagingService.Verb.REQUEST_RESPONSE, response, 
BooleanSerializer.serializer);
+        MessageOut<Boolean> reply = new 
MessageOut<Boolean>(MessagingService.Verb.REQUEST_RESPONSE, 
doPropose(message.payload), BooleanSerializer.serializer);
         MessagingService.instance().sendReply(reply, id, message.from);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ea2c6b8/test/unit/org/apache/cassandra/service/PaxosStateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/PaxosStateTest.java 
b/test/unit/org/apache/cassandra/service/PaxosStateTest.java
index 6c12001..bd7a85f 100644
--- a/test/unit/org/apache/cassandra/service/PaxosStateTest.java
+++ b/test/unit/org/apache/cassandra/service/PaxosStateTest.java
@@ -18,8 +18,11 @@
 package org.apache.cassandra.service;
 
 import java.nio.ByteBuffer;
+import java.util.UUID;
 
 import com.google.common.collect.Iterables;
+import org.apache.cassandra.service.paxos.PrepareVerbHandler;
+import org.apache.cassandra.service.paxos.ProposeVerbHandler;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -100,4 +103,25 @@ public class PaxosStateTest
     {
         Util.assertEmpty(Util.cmd(cfs, key).build());
     }
+
+    @Test
+    public void testPrepareProposePaxos() throws Throwable
+    {
+        ColumnFamilyStore cfs = 
Keyspace.open("PaxosStateTestKeyspace1").getColumnFamilyStore("Standard1");
+        String key = "key" + System.nanoTime();
+        ByteBuffer value = ByteBufferUtil.bytes(0);
+        RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata(), 
FBUtilities.timestampMicros(), key);
+        builder.clustering("a").add("val", value);
+        PartitionUpdate update = 
Iterables.getOnlyElement(builder.build().getPartitionUpdates());
+
+        // CFS should be empty initially
+        assertNoDataPresent(cfs, Util.dk(key));
+
+        UUID ballot = 
UUIDGen.getRandomTimeUUIDFromMicros(System.currentTimeMillis());
+
+        Commit commit = Commit.newPrepare(Util.dk(key), cfs.metadata(), 
ballot);
+
+        assertTrue("paxos prepare stage failed", 
PrepareVerbHandler.doPrepare(commit).promised);
+        assertTrue("paxos propose stage failed", 
ProposeVerbHandler.doPropose(commit));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to