Repository: cassandra Updated Branches: refs/heads/trunk 8ef71f3f2 -> 5ea2c6b8e
Optimize Paxos prepare and propose stage for local requests Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5ea2c6b8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5ea2c6b8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5ea2c6b8 Branch: refs/heads/trunk Commit: 5ea2c6b8e963a061e41f2dedec7955a21860f54a Parents: 8ef71f3 Author: jaydeepkumar1984 <chovatia.jayd...@gmail.com> Authored: Mon Sep 11 22:05:54 2017 -0700 Committer: Ariel Weisberg <aweisb...@apple.com> Committed: Thu Sep 28 16:50:32 2017 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/service/StorageProxy.java | 62 +++++++++++++++++++- .../service/paxos/PrepareVerbHandler.java | 8 ++- .../service/paxos/ProposeVerbHandler.java | 8 ++- .../cassandra/service/PaxosStateTest.java | 24 ++++++++ 5 files changed, 96 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ea2c6b8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 081ed72..6d8d67e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Optimize Paxos prepare and propose stage for local requests (CASSANDRA-13862) * Throttle base partitions during MV repair streaming to prevent OOM (CASSANDRA-13299) * Use compaction threshold for STCS in L0 (CASSANDRA-13861) * Fix problem with min_compress_ratio: 1 and disallow ratio < 1 (CASSANDRA-13703) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ea2c6b8/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index df39530..49bb4b6 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -68,7 +68,10 @@ import org.apache.cassandra.net.*; import org.apache.cassandra.service.paxos.Commit; import org.apache.cassandra.service.paxos.PaxosState; import org.apache.cassandra.service.paxos.PrepareCallback; +import org.apache.cassandra.service.paxos.PrepareResponse; +import org.apache.cassandra.service.paxos.PrepareVerbHandler; import org.apache.cassandra.service.paxos.ProposeCallback; +import org.apache.cassandra.service.paxos.ProposeVerbHandler; import org.apache.cassandra.net.MessagingService.Verb; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.triggers.TriggerExecutor; @@ -501,7 +504,34 @@ public class StorageProxy implements StorageProxyMBean PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), requiredParticipants, consistencyForPaxos, queryStartNanoTime); MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PREPARE, toPrepare, Commit.serializer); for (InetAddress target : endpoints) - MessagingService.instance().sendRR(message, target, callback); + { + if (canDoLocalRequest(target)) + { + StageManager.getStage(MessagingService.verbStages.get(MessagingService.Verb.PAXOS_PREPARE)).execute(new Runnable() + { + public void run() + { + try + { + MessageIn<PrepareResponse> message = MessageIn.create(FBUtilities.getBroadcastAddress(), + PrepareVerbHandler.doPrepare(toPrepare), + Collections.<String, byte[]>emptyMap(), + MessagingService.Verb.INTERNAL_RESPONSE, + MessagingService.current_version); + callback.response(message); + } + catch (Exception ex) + { + logger.error("Failed paxos prepare locally : {}", ex); + } + } + }); + } + else + { + MessagingService.instance().sendRR(message, target, callback); + } + } callback.await(); return callback; } @@ -512,8 +542,34 @@ public class StorageProxy implements StorageProxyMBean ProposeCallback callback = new ProposeCallback(endpoints.size(), requiredParticipants, !timeoutIfPartial, consistencyLevel, queryStartNanoTime); MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PROPOSE, proposal, Commit.serializer); for (InetAddress target : endpoints) - MessagingService.instance().sendRR(message, target, callback); - + { + if (canDoLocalRequest(target)) + { + StageManager.getStage(MessagingService.verbStages.get(MessagingService.Verb.PAXOS_PROPOSE)).execute(new Runnable() + { + public void run() + { + try + { + MessageIn<Boolean> message = MessageIn.create(FBUtilities.getBroadcastAddress(), + ProposeVerbHandler.doPropose(proposal), + Collections.<String, byte[]>emptyMap(), + MessagingService.Verb.INTERNAL_RESPONSE, + MessagingService.current_version); + callback.response(message); + } + catch (Exception ex) + { + logger.error("Failed paxos propose locally : {}", ex); + } + } + }); + } + else + { + MessagingService.instance().sendRR(message, target, callback); + } + } callback.await(); if (callback.isSuccessful()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ea2c6b8/src/java/org/apache/cassandra/service/paxos/PrepareVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareVerbHandler.java b/src/java/org/apache/cassandra/service/paxos/PrepareVerbHandler.java index 50a537f..2750b76 100644 --- a/src/java/org/apache/cassandra/service/paxos/PrepareVerbHandler.java +++ b/src/java/org/apache/cassandra/service/paxos/PrepareVerbHandler.java @@ -28,10 +28,14 @@ import org.apache.cassandra.net.MessagingService; public class PrepareVerbHandler implements IVerbHandler<Commit> { + public static PrepareResponse doPrepare(Commit toPrepare) + { + return PaxosState.prepare(toPrepare); + } + public void doVerb(MessageIn<Commit> message, int id) { - PrepareResponse response = PaxosState.prepare(message.payload); - MessageOut<PrepareResponse> reply = new MessageOut<PrepareResponse>(MessagingService.Verb.REQUEST_RESPONSE, response, PrepareResponse.serializer); + MessageOut<PrepareResponse> reply = new MessageOut<PrepareResponse>(MessagingService.Verb.REQUEST_RESPONSE, doPrepare(message.payload), PrepareResponse.serializer); MessagingService.instance().sendReply(reply, id, message.from); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ea2c6b8/src/java/org/apache/cassandra/service/paxos/ProposeVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/ProposeVerbHandler.java b/src/java/org/apache/cassandra/service/paxos/ProposeVerbHandler.java index 536ff5a..81c9017 100644 --- a/src/java/org/apache/cassandra/service/paxos/ProposeVerbHandler.java +++ b/src/java/org/apache/cassandra/service/paxos/ProposeVerbHandler.java @@ -29,10 +29,14 @@ import org.apache.cassandra.utils.BooleanSerializer; public class ProposeVerbHandler implements IVerbHandler<Commit> { + public static Boolean doPropose(Commit proposal) + { + return PaxosState.propose(proposal); + } + public void doVerb(MessageIn<Commit> message, int id) { - Boolean response = PaxosState.propose(message.payload); - MessageOut<Boolean> reply = new MessageOut<Boolean>(MessagingService.Verb.REQUEST_RESPONSE, response, BooleanSerializer.serializer); + MessageOut<Boolean> reply = new MessageOut<Boolean>(MessagingService.Verb.REQUEST_RESPONSE, doPropose(message.payload), BooleanSerializer.serializer); MessagingService.instance().sendReply(reply, id, message.from); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ea2c6b8/test/unit/org/apache/cassandra/service/PaxosStateTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/PaxosStateTest.java b/test/unit/org/apache/cassandra/service/PaxosStateTest.java index 6c12001..bd7a85f 100644 --- a/test/unit/org/apache/cassandra/service/PaxosStateTest.java +++ b/test/unit/org/apache/cassandra/service/PaxosStateTest.java @@ -18,8 +18,11 @@ package org.apache.cassandra.service; import java.nio.ByteBuffer; +import java.util.UUID; import com.google.common.collect.Iterables; +import org.apache.cassandra.service.paxos.PrepareVerbHandler; +import org.apache.cassandra.service.paxos.ProposeVerbHandler; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -100,4 +103,25 @@ public class PaxosStateTest { Util.assertEmpty(Util.cmd(cfs, key).build()); } + + @Test + public void testPrepareProposePaxos() throws Throwable + { + ColumnFamilyStore cfs = Keyspace.open("PaxosStateTestKeyspace1").getColumnFamilyStore("Standard1"); + String key = "key" + System.nanoTime(); + ByteBuffer value = ByteBufferUtil.bytes(0); + RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata(), FBUtilities.timestampMicros(), key); + builder.clustering("a").add("val", value); + PartitionUpdate update = Iterables.getOnlyElement(builder.build().getPartitionUpdates()); + + // CFS should be empty initially + assertNoDataPresent(cfs, Util.dk(key)); + + UUID ballot = UUIDGen.getRandomTimeUUIDFromMicros(System.currentTimeMillis()); + + Commit commit = Commit.newPrepare(Util.dk(key), cfs.metadata(), ballot); + + assertTrue("paxos prepare stage failed", PrepareVerbHandler.doPrepare(commit).promised); + assertTrue("paxos propose stage failed", ProposeVerbHandler.doPropose(commit)); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org