This is an automated email from the ASF dual-hosted git repository. jmckenzie pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new f4b69ba0e8 Fix "open RT bound as its last item" exception f4b69ba0e8 is described below commit f4b69ba0e82bb051e56a92d792142034d9f617f0 Author: Josh McKenzie <jmcken...@apache.org> AuthorDate: Mon Sep 19 14:49:10 2022 -0400 Fix "open RT bound as its last item" exception Patch by Marcus Eriksson; reviewed by Aleksey Yeschenko and Josh McKenzie for CASSANDRA-17810 Co-authored-by: Marcus Eriksson <marc...@apache.org> Co-authored-by: Josh McKenzie <jmcken...@apache.org> --- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/ReadCommand.java | 125 ++++++++++++--------- .../cassandra/db/ReadCommandVerbHandler.java | 26 +++-- .../cassandra/db/transform/RTBoundValidator.java | 45 ++++---- .../exceptions/QueryCancelledException.java | 28 +++++ .../org/apache/cassandra/service/StorageProxy.java | 17 ++- .../distributed/test/TimeoutAbortTest.java | 62 ++++++++++ .../org/apache/cassandra/db/ReadCommandTest.java | 34 +++++- 8 files changed, 249 insertions(+), 89 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 60e5f976e5..bda1ba04d7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.2 + * Fix "open RT bound as its last item" exception (CASSANDRA-17810) * Fix leak of non-standard Java types in JMX MBeans `org.apache.cassandra.db:type=StorageService` and `org.apache.cassandra.db:type=RepairService` as clients using JMX cannot handle them. More details in NEWS.txt (CASSANDRA-17668) * Deprecate Throwables.propagate usage (CASSANDRA-14218) diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 358d408919..ae64710005 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory; import io.netty.util.concurrent.FastThreadLocal; import org.apache.cassandra.config.*; import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.exceptions.QueryCancelledException; import org.apache.cassandra.net.MessageFlag; import org.apache.cassandra.net.ParamType; import org.apache.cassandra.net.Verb; @@ -422,7 +423,8 @@ public abstract class ReadCommand extends AbstractReadQuery try { iterator = withQuerySizeTracking(iterator); - iterator = withStateTracking(iterator); + iterator = maybeSlowDownForTesting(iterator); + iterator = withQueryCancellation(iterator); iterator = RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs, executionController), Stage.PURGED, false); iterator = withMetricsRecording(iterator, cfs.metric, startTimeNanos); @@ -601,58 +603,6 @@ public abstract class ReadCommand extends AbstractReadQuery return Transformation.apply(iter, new MetricRecording()); } - protected class CheckForAbort extends StoppingTransformation<UnfilteredRowIterator> - { - long lastChecked = 0; - - protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) - { - if (maybeAbort()) - { - partition.close(); - return null; - } - - return Transformation.apply(partition, this); - } - - protected Row applyToRow(Row row) - { - if (TEST_ITERATION_DELAY_MILLIS > 0) - maybeDelayForTesting(); - - return maybeAbort() ? null : row; - } - - private boolean maybeAbort() - { - /** - * TODO: this is not a great way to abort early; why not expressly limit checks to 10ms intervals? - * The value returned by approxTime.now() is updated only every - * {@link org.apache.cassandra.utils.MonotonicClock.SampledClock.CHECK_INTERVAL_MS}, by default 2 millis. Since MonitorableImpl - * relies on approxTime, we don't need to check unless the approximate time has elapsed. - */ - if (lastChecked == approxTime.now()) - return false; - - lastChecked = approxTime.now(); - - if (isAborted()) - { - stop(); - return true; - } - - return false; - } - - private void maybeDelayForTesting() - { - if (!metadata().keyspace.startsWith("system")) - FBUtilities.sleepQuietly(TEST_ITERATION_DELAY_MILLIS); - } - } - private boolean shouldTrackSize(DataStorageSpec.LongBytesBound warnThresholdBytes, DataStorageSpec.LongBytesBound abortThresholdBytes) { return trackWarnings @@ -737,9 +687,74 @@ public abstract class ReadCommand extends AbstractReadQuery return iterator; } - protected UnfilteredPartitionIterator withStateTracking(UnfilteredPartitionIterator iter) + private class QueryCancellationChecker extends StoppingTransformation<UnfilteredRowIterator> + { + long lastCheckedAt = 0; + + @Override + protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) + { + maybeCancel(); + return Transformation.apply(partition, this); + } + + @Override + protected Row applyToRow(Row row) + { + maybeCancel(); + return row; + } + + private void maybeCancel() + { + /* + * The value returned by approxTime.now() is updated only every + * {@link org.apache.cassandra.utils.MonotonicClock.SampledClock.CHECK_INTERVAL_MS}, by default 2 millis. + * Since MonitorableImpl relies on approxTime, we don't need to check unless the approximate time has elapsed. + */ + if (lastCheckedAt == approxTime.now()) + return; + lastCheckedAt = approxTime.now(); + + if (isAborted()) + { + stop(); + throw new QueryCancelledException(ReadCommand.this); + } + } + } + + private UnfilteredPartitionIterator withQueryCancellation(UnfilteredPartitionIterator iter) + { + return Transformation.apply(iter, new QueryCancellationChecker()); + } + + /** + * A transformation used for simulating slow queries by tests. + */ + private static class DelayInjector extends Transformation<UnfilteredRowIterator> + { + @Override + protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) + { + FBUtilities.sleepQuietly(TEST_ITERATION_DELAY_MILLIS); + return Transformation.apply(partition, this); + } + + @Override + protected Row applyToRow(Row row) + { + FBUtilities.sleepQuietly(TEST_ITERATION_DELAY_MILLIS); + return row; + } + } + + private UnfilteredPartitionIterator maybeSlowDownForTesting(UnfilteredPartitionIterator iter) { - return Transformation.apply(iter, new CheckForAbort()); + if (TEST_ITERATION_DELAY_MILLIS > 0 && !SchemaConstants.isSystemKeyspace(metadata().keyspace)) + return Transformation.apply(iter, new DelayInjector()); + else + return iter; } /** diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java index 92265687de..f693bbc7c3 100644 --- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java +++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java @@ -24,6 +24,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.exceptions.QueryCancelledException; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; @@ -77,18 +78,29 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand> MessagingService.instance().send(reply, message.from()); return; } + catch (AssertionError t) + { + throw new AssertionError(String.format("Caught an error while trying to process the command: %s", command.toCQLString()), t); + } + catch (QueryCancelledException e) + { + logger.debug("Query cancelled (timeout)", e); + response = null; + assert !command.isCompleted() : "Read marked as completed despite being aborted by timeout to table " + command.metadata(); + } - if (!command.complete()) + if (command.complete()) + { + Tracing.trace("Enqueuing response to {}", message.from()); + Message<ReadResponse> reply = message.responseWith(response); + reply = MessageParams.addToMessage(reply); + MessagingService.instance().send(reply, message.from()); + } + else { Tracing.trace("Discarding partial response to {} (timed out)", message.from()); MessagingService.instance().metrics.recordDroppedMessage(message, message.elapsedSinceCreated(NANOSECONDS), NANOSECONDS); - return; } - - Tracing.trace("Enqueuing response to {}", message.from()); - Message<ReadResponse> reply = message.responseWith(response); - reply = MessageParams.addToMessage(reply); - MessagingService.instance().send(reply, message.from()); } private void validateTransientStatus(Message<ReadCommand> message) diff --git a/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java b/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java index eb37f4bc1c..e197ce20b7 100644 --- a/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java +++ b/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java @@ -17,11 +17,12 @@ */ package org.apache.cassandra.db.transform; +import java.util.Objects; + import org.apache.cassandra.db.DeletionTime; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.rows.RangeTombstoneMarker; import org.apache.cassandra.db.rows.UnfilteredRowIterator; -import org.apache.cassandra.schema.TableMetadata; /** * A validating transformation that sanity-checks the sequence of RT bounds and boundaries in every partition. @@ -51,29 +52,27 @@ public final class RTBoundValidator extends Transformation<UnfilteredRowIterator public static UnfilteredRowIterator validate(UnfilteredRowIterator partition, Stage stage, boolean enforceIsClosed) { - return Transformation.apply(partition, new RowsTransformation(stage, partition.metadata(), partition.isReverseOrder(), enforceIsClosed)); + return Transformation.apply(partition, new RowsTransformation(stage, partition, enforceIsClosed)); } @Override public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) { - return Transformation.apply(partition, new RowsTransformation(stage, partition.metadata(), partition.isReverseOrder(), enforceIsClosed)); + return Transformation.apply(partition, new RowsTransformation(stage, partition, enforceIsClosed)); } private final static class RowsTransformation extends Transformation { private final Stage stage; - private final TableMetadata metadata; - private final boolean isReverseOrder; private final boolean enforceIsClosed; + private final UnfilteredRowIterator partition; private DeletionTime openMarkerDeletionTime; - private RowsTransformation(Stage stage, TableMetadata metadata, boolean isReverseOrder, boolean enforceIsClosed) + private RowsTransformation(Stage stage, UnfilteredRowIterator partition, boolean enforceIsClosed) { this.stage = stage; - this.metadata = metadata; - this.isReverseOrder = isReverseOrder; + this.partition = partition; this.enforceIsClosed = enforceIsClosed; } @@ -83,25 +82,25 @@ public final class RTBoundValidator extends Transformation<UnfilteredRowIterator if (null == openMarkerDeletionTime) { // there is no open RT in the stream - we are expecting a *_START_BOUND - if (marker.isClose(isReverseOrder)) - throw ise("unexpected end bound or boundary " + marker.toString(metadata)); + if (marker.isClose(partition.isReverseOrder())) + throw ise("unexpected end bound or boundary " + marker.toString(partition.metadata())); } else { // there is an open RT in the stream - we are expecting a *_BOUNDARY or an *_END_BOUND - if (!marker.isClose(isReverseOrder)) - throw ise("start bound followed by another start bound " + marker.toString(metadata)); + if (!marker.isClose(partition.isReverseOrder())) + throw ise("start bound followed by another start bound " + marker.toString(partition.metadata())); // deletion times of open/close markers must match - DeletionTime deletionTime = marker.closeDeletionTime(isReverseOrder); + DeletionTime deletionTime = marker.closeDeletionTime(partition.isReverseOrder()); if (!deletionTime.equals(openMarkerDeletionTime)) - throw ise("open marker and close marker have different deletion times"); + throw ise("open marker and close marker have different deletion times, close=" + deletionTime); openMarkerDeletionTime = null; } - if (marker.isOpen(isReverseOrder)) - openMarkerDeletionTime = marker.openDeletionTime(isReverseOrder); + if (marker.isOpen(partition.isReverseOrder())) + openMarkerDeletionTime = marker.openDeletionTime(partition.isReverseOrder()); return marker; } @@ -115,9 +114,17 @@ public final class RTBoundValidator extends Transformation<UnfilteredRowIterator private IllegalStateException ise(String why) { - String message = - String.format("%s UnfilteredRowIterator for %s has an illegal RT bounds sequence: %s", stage, metadata, why); - throw new IllegalStateException(message); + throw new IllegalStateException(message(why)); + } + + private String message(String why) + { + return String.format("%s UnfilteredRowIterator for %s (key: %s omdt: [%s]) has an illegal RT bounds sequence: %s", + stage, + partition.metadata(), + partition.metadata().partitionKeyType.getString(partition.partitionKey().getKey()), + Objects.toString(openMarkerDeletionTime, "not present"), + why); } } } diff --git a/src/java/org/apache/cassandra/exceptions/QueryCancelledException.java b/src/java/org/apache/cassandra/exceptions/QueryCancelledException.java new file mode 100644 index 0000000000..45b6334b8d --- /dev/null +++ b/src/java/org/apache/cassandra/exceptions/QueryCancelledException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.exceptions; + +import org.apache.cassandra.db.ReadCommand; + +public class QueryCancelledException extends RuntimeException +{ + public QueryCancelledException(ReadCommand command) + { + super("Query cancelled for taking too long: " + command.toCQLString()); + } +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 4a66b511be..3c20aad9ce 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -44,11 +44,6 @@ import com.google.common.cache.CacheLoader; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.Uninterruptibles; -import org.apache.cassandra.service.paxos.Ballot; -import org.apache.cassandra.service.paxos.Commit; -import org.apache.cassandra.service.paxos.ContentionStrategy; -import org.apache.cassandra.service.paxos.Paxos; -import org.apache.cassandra.service.paxos.PaxosState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,6 +84,7 @@ import org.apache.cassandra.exceptions.CasWriteUnknownResultException; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.IsBootstrappingException; import org.apache.cassandra.exceptions.OverloadedException; +import org.apache.cassandra.exceptions.QueryCancelledException; import org.apache.cassandra.exceptions.ReadAbortException; import org.apache.cassandra.exceptions.ReadFailureException; import org.apache.cassandra.exceptions.ReadTimeoutException; @@ -126,6 +122,11 @@ import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.paxos.Ballot; +import org.apache.cassandra.service.paxos.Commit; +import org.apache.cassandra.service.paxos.ContentionStrategy; +import org.apache.cassandra.service.paxos.Paxos; +import org.apache.cassandra.service.paxos.PaxosState; import org.apache.cassandra.service.paxos.v1.PrepareCallback; import org.apache.cassandra.service.paxos.v1.ProposeCallback; import org.apache.cassandra.service.reads.AbstractReadExecutor; @@ -2170,6 +2171,12 @@ public class StorageProxy implements StorageProxyMBean response = command.createEmptyResponse(); readRejected = true; } + catch (QueryCancelledException e) + { + logger.debug("Query cancelled (timeout)", e); + response = null; + assert !command.isCompleted() : "Local read marked as completed despite being aborted by timeout to table " + command.metadata(); + } if (command.complete()) { diff --git a/test/distributed/org/apache/cassandra/distributed/test/TimeoutAbortTest.java b/test/distributed/org/apache/cassandra/distributed/test/TimeoutAbortTest.java new file mode 100644 index 0000000000..8adf30777f --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/TimeoutAbortTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.io.IOException; +import java.util.List; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; + +import static org.junit.Assert.assertFalse; +import static org.psjava.util.AssertStatus.assertTrue; + +public class TimeoutAbortTest extends TestBaseImpl +{ + @Test + public void timeoutTest() throws IOException, InterruptedException + { + System.setProperty("cassandra.test.read_iteration_delay_ms", "5000"); + try (Cluster cluster = init(Cluster.build(1).start())) + { + cluster.schemaChange(withKeyspace("create table %s.tbl (id int, ck1 int, ck2 int, d int, primary key (id, ck1, ck2))")); + cluster.coordinator(1).execute(withKeyspace("delete from %s.tbl using timestamp 5 where id = 1 and ck1 = 77 "), ConsistencyLevel.ALL); + cluster.get(1).flush(KEYSPACE); + Thread.sleep(1000); + for (int i = 0; i < 100; i++) + cluster.coordinator(1).execute(withKeyspace("insert into %s.tbl (id, ck1, ck2, d) values (1,77,?,1) using timestamp 10"), ConsistencyLevel.ALL, i); + cluster.get(1).flush(KEYSPACE); + boolean caughtException = false; + try + { + cluster.coordinator(1).execute(withKeyspace("select * from %s.tbl where id=1 and ck1 = 77"), ConsistencyLevel.ALL); + } + catch (Exception e) + { + assertTrue(e.getClass().getName().contains("ReadTimeoutException")); + caughtException = true; + } + assertTrue(caughtException); + List<String> errors = cluster.get(1).logs().grepForErrors().getResult(); + assertFalse(errors.toString(), errors.stream().anyMatch(s -> s.contains("open RT bound"))); + } + } +} \ No newline at end of file diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java index 43a7952175..bf272b87cf 100644 --- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java +++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java @@ -56,6 +56,7 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.exceptions.QueryCancelledException; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputBuffer; @@ -232,7 +233,16 @@ public class ReadCommandTest assertEquals(2, Util.getAll(readCommand).size()); readCommand.abort(); - assertEquals(0, Util.getAll(readCommand).size()); + boolean cancelled = false; + try + { + Util.getAll(readCommand); + } + catch (QueryCancelledException e) + { + cancelled = true; + } + assertTrue(cancelled); } @Test @@ -263,7 +273,16 @@ public class ReadCommandTest assertEquals(2, partitions.get(0).rowCount()); readCommand.abort(); - assertEquals(0, Util.getAll(readCommand).size()); + boolean cancelled = false; + try + { + Util.getAll(readCommand); + } + catch (QueryCancelledException e) + { + cancelled = true; + } + assertTrue(cancelled); } @Test @@ -294,7 +313,16 @@ public class ReadCommandTest assertEquals(2, partitions.get(0).rowCount()); readCommand.abort(); - assertEquals(0, Util.getAll(readCommand).size()); + boolean cancelled = false; + try + { + Util.getAll(readCommand); + } + catch (QueryCancelledException e) + { + cancelled = true; + } + assertTrue(cancelled); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org