This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push: new 0ef232f88b CEP-15: (Accord) SyncPoint timeouts become a Exhausted rather than a Timeout and doesn’t get retried 0ef232f88b is described below commit 0ef232f88b83a8d1bba6e406c9075e9e051d0241 Author: David Capwell <dcapw...@apache.org> AuthorDate: Tue Jun 18 12:20:22 2024 -0700 CEP-15: (Accord) SyncPoint timeouts become a Exhausted rather than a Timeout and doesn’t get retried patch by David Capwell; reviewed by Ariel Weisberg for CASSANDRA-19718 --- modules/accord | 2 +- src/java/org/apache/cassandra/net/Verb.java | 4 +- .../cassandra/service/accord/AccordService.java | 78 ++++++--- .../accord/exceptions/ReadExhaustedException.java | 39 +++++ .../apache/cassandra/streaming/StreamSession.java | 10 +- src/java/org/apache/cassandra/utils/Blocking.java | 42 +++++ .../test/accord/AccordBootstrapTest.java | 2 +- .../apache/cassandra/repair/LocalSyncTaskTest.java | 2 +- .../service/accord/AccordServiceTest.java | 185 +++++++++++++++++++++ 9 files changed, 338 insertions(+), 26 deletions(-) diff --git a/modules/accord b/modules/accord index 37c957c719..f1f5ea5ccb 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 37c957c719491634f081b39900ebf708079ef3ee +Subproject commit f1f5ea5ccbd6e0a8abf579a4331fa84a1b3d9f95 diff --git a/src/java/org/apache/cassandra/net/Verb.java b/src/java/org/apache/cassandra/net/Verb.java index eb244f1bc6..151b59c8df 100644 --- a/src/java/org/apache/cassandra/net/Verb.java +++ b/src/java/org/apache/cassandra/net/Verb.java @@ -335,8 +335,8 @@ public enum Verb ACCORD_GET_EPHMRL_READ_DEPS_REQ (162, P2, writeTimeout, IMMEDIATE, () -> GetEphmrlReadDepsSerializers.request, AccordService::verbHandlerOrNoop, ACCORD_GET_EPHMRL_READ_DEPS_RSP), ACCORD_GET_MAX_CONFLICT_RSP (163, P2, writeTimeout, IMMEDIATE, () -> GetMaxConflictSerializers.reply, RESPONSE_HANDLER ), ACCORD_GET_MAX_CONFLICT_REQ (164, P2, writeTimeout, IMMEDIATE, () -> GetMaxConflictSerializers.request, AccordService::verbHandlerOrNoop, ACCORD_GET_MAX_CONFLICT_RSP), - ACCORD_FETCH_DATA_RSP (145, P2, repairTimeout,IMMEDIATE, () -> FetchSerializers.reply, RESPONSE_HANDLER ), - ACCORD_FETCH_DATA_REQ (146, P2, repairTimeout,IMMEDIATE, () -> FetchSerializers.request, AccordService::verbHandlerOrNoop, ACCORD_FETCH_DATA_RSP ), + ACCORD_FETCH_DATA_RSP (145, P2, writeTimeout,IMMEDIATE, () -> FetchSerializers.reply, RESPONSE_HANDLER ), + ACCORD_FETCH_DATA_REQ (146, P2, writeTimeout,IMMEDIATE, () -> FetchSerializers.request, AccordService::verbHandlerOrNoop, ACCORD_FETCH_DATA_RSP ), ACCORD_SET_SHARD_DURABLE_REQ (147, P2, writeTimeout, IMMEDIATE, () -> SetDurableSerializers.shardDurable, AccordService::verbHandlerOrNoop, ACCORD_SIMPLE_RSP ), ACCORD_SET_GLOBALLY_DURABLE_REQ (148, P2, writeTimeout, IMMEDIATE, () -> SetDurableSerializers.globallyDurable,AccordService::verbHandlerOrNoop, ACCORD_SIMPLE_RSP ), ACCORD_QUERY_DURABLE_BEFORE_RSP (149, P2, writeTimeout, IMMEDIATE, () -> QueryDurableBeforeSerializers.reply, RESPONSE_HANDLER ), diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index 8764addbf5..d3954a208a 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -35,10 +35,14 @@ import javax.annotation.concurrent.GuardedBy; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; import com.google.common.primitives.Ints; import accord.coordinate.Barrier; import accord.coordinate.CoordinateSyncPoint; +import accord.coordinate.Exhausted; +import accord.coordinate.FailureAccumulator; import accord.coordinate.TopologyMismatch; import accord.impl.CoordinateDurabilityScheduling; import accord.primitives.SyncPoint; @@ -47,6 +51,7 @@ import org.apache.cassandra.cql3.statements.RequestValidations; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.service.accord.exceptions.ReadExhaustedException; import org.apache.cassandra.service.accord.interop.AccordInteropAdapter.AccordInteropFactory; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.service.accord.repair.RepairSyncPointAdapter; @@ -392,6 +397,13 @@ public class AccordService implements IAccordService, Shutdownable // Protocol also doesn't have a way to denote "unknown" outcome, so using a timeout as the closest match throw newBarrierPreempted(txnId, barrierType.global); } + if (cause instanceof Exhausted) + { + // this case happens when a non-timeout exception is seen, and we are unable to move forward + metrics.failures.mark(); + throw newBarrierExhausted(txnId, barrierType.global); + } + // unknown error metrics.failures.mark(); throw new RuntimeException(cause); } @@ -430,35 +442,40 @@ public class AccordService implements IAccordService, Shutdownable return barrier(keysOrRanges, epoch, queryStartNanos, timeoutNanos, barrierType, isForWrite, repairSyncPoint(allNodes)); } - private static ReadTimeoutException newBarrierTimeout(TxnId txnId, boolean global) + @VisibleForTesting + static ReadTimeoutException newBarrierTimeout(TxnId txnId, boolean global) { return new ReadTimeoutException(global ? ConsistencyLevel.ANY : ConsistencyLevel.QUORUM, 0, 0, false, txnId.toString()); } - private static ReadTimeoutException newBarrierPreempted(TxnId txnId, boolean global) + @VisibleForTesting + static ReadTimeoutException newBarrierPreempted(TxnId txnId, boolean global) { return new ReadPreemptedException(global ? ConsistencyLevel.ANY : ConsistencyLevel.QUORUM, 0, 0, false, txnId.toString()); } - private long doWithRetries(LongSupplier action, int retryAttempts, long initialBackoffMillis, long maxBackoffMillis) throws InterruptedException + @VisibleForTesting + static ReadExhaustedException newBarrierExhausted(TxnId txnId, boolean global) + { + //TODO (usability): not being able to show the txn is a bad UX, this becomes harder to trace back in logs + return new ReadExhaustedException(global ? ConsistencyLevel.ANY : ConsistencyLevel.QUORUM, 0, 0, false, ImmutableMap.of()); + } + + @VisibleForTesting + static boolean isTimeout(Throwable t) + { + return t instanceof Timeout || t instanceof ReadTimeoutException || t instanceof Preempted || t instanceof ReadPreemptedException; + } + + @VisibleForTesting + static long doWithRetries(Blocking blocking, LongSupplier action, int retryAttempts, long initialBackoffMillis, long maxBackoffMillis) throws InterruptedException { // Since we could end up having the barrier transaction or the transaction it listens to invalidated - RuntimeException existingFailures = null; + Throwable existingFailures = null; Long success = null; - long backoffMillis = 0; + long backoffMillis = initialBackoffMillis; for (int attempt = 0; attempt < retryAttempts; attempt++) { - try - { - Thread.sleep(backoffMillis); - } - catch (InterruptedException e) - { - if (existingFailures != null) - e.addSuppressed(existingFailures); - throw e; - } - backoffMillis = backoffMillis == 0 ? initialBackoffMillis : Math.min(backoffMillis * 2, maxBackoffMillis); try { success = action.getAsLong(); @@ -466,13 +483,34 @@ public class AccordService implements IAccordService, Shutdownable } catch (RequestExecutionException | CoordinationFailed newFailures) { - existingFailures = Throwables.merge(existingFailures, newFailures); + existingFailures = FailureAccumulator.append(existingFailures, newFailures, AccordService::isTimeout); + + try + { + blocking.sleep(backoffMillis); + } + catch (InterruptedException e) + { + if (existingFailures != null) + e.addSuppressed(existingFailures); + throw e; + } + backoffMillis = Math.min(backoffMillis * 2, maxBackoffMillis); + } + catch (Throwable t) + { + // if an unknown/unexpected error happens retry stops right away + if (existingFailures != null) + t.addSuppressed(existingFailures); + existingFailures = t; + break; } } if (success == null) { checkState(existingFailures != null, "Didn't have success, but also didn't have failures"); - throw existingFailures; + Throwables.throwIfUnchecked(existingFailures); + throw new RuntimeException(existingFailures); } return success; } @@ -480,7 +518,7 @@ public class AccordService implements IAccordService, Shutdownable @Override public long barrierWithRetries(Seekables keysOrRanges, long minEpoch, BarrierType barrierType, boolean isForWrite) throws InterruptedException { - return doWithRetries(() -> AccordService.instance().barrier(keysOrRanges, minEpoch, Clock.Global.nanoTime(), DatabaseDescriptor.getAccordRangeBarrierTimeoutNanos(), barrierType, isForWrite), + return doWithRetries(Blocking.Default.instance, () -> AccordService.instance().barrier(keysOrRanges, minEpoch, Clock.Global.nanoTime(), DatabaseDescriptor.getAccordRangeBarrierTimeoutNanos(), barrierType, isForWrite), DatabaseDescriptor.getAccordBarrierRetryAttempts(), DatabaseDescriptor.getAccordBarrierRetryInitialBackoffMillis(), DatabaseDescriptor.getAccordBarrierRetryMaxBackoffMillis()); @@ -489,7 +527,7 @@ public class AccordService implements IAccordService, Shutdownable @Override public long repairWithRetries(Seekables keysOrRanges, long minEpoch, BarrierType barrierType, boolean isForWrite, List<InetAddressAndPort> allEndpoints) throws InterruptedException { - return doWithRetries(() -> AccordService.instance().repair(keysOrRanges, minEpoch, Clock.Global.nanoTime(), DatabaseDescriptor.getAccordRangeBarrierTimeoutNanos(), barrierType, isForWrite, allEndpoints), + return doWithRetries(Blocking.Default.instance, () -> AccordService.instance().repair(keysOrRanges, minEpoch, Clock.Global.nanoTime(), DatabaseDescriptor.getAccordRangeBarrierTimeoutNanos(), barrierType, isForWrite, allEndpoints), DatabaseDescriptor.getAccordBarrierRetryAttempts(), DatabaseDescriptor.getAccordBarrierRetryInitialBackoffMillis(), DatabaseDescriptor.getAccordBarrierRetryMaxBackoffMillis()); diff --git a/src/java/org/apache/cassandra/service/accord/exceptions/ReadExhaustedException.java b/src/java/org/apache/cassandra/service/accord/exceptions/ReadExhaustedException.java new file mode 100644 index 0000000000..4ebfc8fdb0 --- /dev/null +++ b/src/java/org/apache/cassandra/service/accord/exceptions/ReadExhaustedException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord.exceptions; + +import java.util.Map; + +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.exceptions.ReadFailureException; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.locator.InetAddressAndPort; + +public class ReadExhaustedException extends ReadFailureException +{ + public ReadExhaustedException(ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint) + { + super(consistency, received, blockFor, dataPresent, failureReasonByEndpoint); + } + + protected ReadExhaustedException(String msg, ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint) + { + super(msg, consistency, received, blockFor, dataPresent, failureReasonByEndpoint); + } +} diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 050e37c749..447621c78c 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -1306,8 +1306,16 @@ public class StreamSession return requests.size(); } - @VisibleForTesting public int getNumTransfers() + { + return transfers.size(); + } + + //TODO (now, review): there were 2 tests that use this (nothing else) and both are checking that its > 1... but in both cases they are checking if there are transfer tasks, but there isn't any as the range doesn't have data... + // This looks like AccordBootstrapTest and LocalSyncTaskTest have a test bug, so rather than fixing this method was created to keep the old semantic... + @Deprecated(since = "5.1") + @VisibleForTesting + public int getNumKeyspaceTransfers() { return transferredRangesPerKeyspace.size(); } diff --git a/src/java/org/apache/cassandra/utils/Blocking.java b/src/java/org/apache/cassandra/utils/Blocking.java new file mode 100644 index 0000000000..e04e53b090 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/Blocking.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils; + +import java.util.concurrent.TimeUnit; + +public interface Blocking +{ + default void sleep(long millis) throws InterruptedException + { + sleep(millis, TimeUnit.MILLISECONDS); + } + + void sleep(long value, TimeUnit unit) throws InterruptedException; + + enum Default implements Blocking + { + instance; + + @Override + public void sleep(long value, TimeUnit unit) throws InterruptedException + { + unit.sleep(value); + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java index 2241a8c911..14ae7d1cde 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java @@ -267,7 +267,7 @@ public class AccordBootstrapTest extends TestBaseImpl StreamListener.listener.forSession(session -> { Assert.assertEquals(node3Addr, session.peer.getAddress()); Assert.assertEquals(0, session.getNumRequests()); - Assert.assertTrue(session.getNumTransfers() > 0); + Assert.assertTrue(session.getNumKeyspaceTransfers() > 0); }); awaitUninterruptiblyAndRethrow(service().node().commandStores().forEach(safeStore -> { diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java index 95f630dc05..c03fe2aac7 100644 --- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java @@ -171,7 +171,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest StreamCoordinator coordinator = plan.getCoordinator(); StreamSession session = Iterables.getOnlyElement(coordinator.getAllStreamSessions()); assertEquals(expectedIncoming, session.getNumRequests()); - assertEquals(expectedOutgoing, session.getNumTransfers()); + assertEquals(expectedOutgoing, session.getNumKeyspaceTransfers()); } @Test diff --git a/test/unit/org/apache/cassandra/service/accord/AccordServiceTest.java b/test/unit/org/apache/cassandra/service/accord/AccordServiceTest.java new file mode 100644 index 0000000000..ea88d11977 --- /dev/null +++ b/test/unit/org/apache/cassandra/service/accord/AccordServiceTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.function.LongSupplier; + +import org.junit.Test; + +import accord.coordinate.Exhausted; +import accord.coordinate.Preempted; +import accord.coordinate.Timeout; +import accord.primitives.TxnId; +import org.apache.cassandra.utils.Blocking; +import org.assertj.core.api.Condition; +import org.mockito.Mockito; + +import static accord.utils.Property.qt; +import static org.apache.cassandra.service.accord.AccordService.doWithRetries; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class AccordServiceTest +{ + @Test + public void retryExpectedFailures() throws InterruptedException + { + Blocking blocking = Mockito.mock(Blocking.class); + class Task implements LongSupplier + { + private int attempts = 0; + + @Override + public long getAsLong() + { + switch (attempts) + { + case 0: + attempts++; + throw new Timeout(null, null); + case 1: + attempts++; + throw AccordService.newBarrierTimeout(TxnId.NONE, true); + case 2: + attempts++; + throw new Preempted(null, null); + case 3: + attempts++; + throw AccordService.newBarrierPreempted(TxnId.NONE, true); + case 4: + attempts++; + throw new Exhausted(null, null); + case 5: + attempts++; + throw AccordService.newBarrierExhausted(TxnId.NONE, true); + default: + return 42; + } + } + } + Task failing = new Task(); + assertThat(doWithRetries(blocking, failing, Integer.MAX_VALUE, 100, 1000)).isEqualTo(42); + verify(blocking).sleep(100); + verify(blocking).sleep(200); + verify(blocking).sleep(400); + verify(blocking).sleep(800); + verify(blocking, times(2)).sleep(1000); // hit max backoff, so stays at 1k + } + + @Test + public void retryThrowsTimeout() + { + Blocking blocking = Mockito.mock(Blocking.class); + qt().check(rs -> { + List<Runnable> timeoutFailures = new ArrayList<>(4); + timeoutFailures.add(() -> {throw new Timeout(null, null);}); + timeoutFailures.add(() -> {throw AccordService.newBarrierTimeout(TxnId.NONE, true);}); + timeoutFailures.add(() -> {throw new Preempted(null, null);}); + timeoutFailures.add(() -> {throw AccordService.newBarrierPreempted(TxnId.NONE, true);}); + Collections.shuffle(timeoutFailures, rs.asJdkRandom()); + Iterator<Runnable> it = timeoutFailures.iterator(); + LongSupplier failing = () -> { + if (!it.hasNext()) throw new IllegalStateException("Called too many times"); + it.next().run(); // this throws... + return 42; + }; + assertThatThrownBy(() -> doWithRetries(blocking, failing, timeoutFailures.size(), 100, 1000)).is(new Condition<>(AccordService::isTimeout, "timeout")); + assertThat(it).isExhausted(); + }); + } + + @Test + public void retryThrowsNonTimeout() + { + Blocking blocking = Mockito.mock(Blocking.class); + qt().check(rs -> { + List<Runnable> timeoutFailures = new ArrayList<>(5); + timeoutFailures.add(() -> {throw new Timeout(null, null);}); + timeoutFailures.add(() -> {throw AccordService.newBarrierTimeout(TxnId.NONE, true);}); + timeoutFailures.add(() -> {throw new Preempted(null, null);}); + timeoutFailures.add(() -> {throw AccordService.newBarrierPreempted(TxnId.NONE, true);}); + timeoutFailures.add(() -> {throw new Exhausted(null, null);}); + Collections.shuffle(timeoutFailures, rs.asJdkRandom()); + Iterator<Runnable> it = timeoutFailures.iterator(); + LongSupplier failing = () -> { + if (!it.hasNext()) throw new IllegalStateException("Called too many times"); + it.next().run(); // this throws... + return 42; + }; + assertThatThrownBy(() -> doWithRetries(blocking, failing, timeoutFailures.size(), 100, 1000)).isInstanceOf(Exhausted.class); + assertThat(it).isExhausted(); + }); + } + + @Test + public void retryShortCircuitError() + { + class Unexpected implements Runnable + { + final boolean isError; + + Unexpected(boolean isError) + { + this.isError = isError; + } + + @Override + public void run() + { + if (isError) throw new AssertionError(); + throw new NullPointerException(); + } + } + qt().check(rs -> { + List<Runnable> failures = new ArrayList<>(6); + failures.add(() -> {throw new Timeout(null, null);}); + failures.add(() -> {throw AccordService.newBarrierTimeout(TxnId.NONE, true);}); + failures.add(() -> {throw new Preempted(null, null);}); + failures.add(() -> {throw AccordService.newBarrierPreempted(TxnId.NONE, true);}); + failures.add(() -> {throw new Exhausted(null, null);}); + boolean isError = rs.nextBoolean(); + failures.add(new Unexpected(isError)); + Collections.shuffle(failures, rs.asJdkRandom()); + int unexpectedIndex = -1; + for (int i = 0; i < failures.size(); i++) + { + if (failures.get(i) instanceof Unexpected) + { + unexpectedIndex = i; + break; + } + } + Iterator<Runnable> it = failures.iterator(); + LongSupplier failing = () -> { + if (!it.hasNext()) throw new IllegalStateException("Called too many times"); + it.next().run(); // this throws... + return 42; + }; + Blocking blocking = Mockito.mock(Blocking.class); + assertThatThrownBy(() -> doWithRetries(blocking, failing, failures.size(), 100, 1000)).isInstanceOf(isError ? AssertionError.class : NullPointerException.class); + verify(blocking, times(unexpectedIndex)).sleep(Mockito.anyLong()); + }); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org