This is an automated email from the ASF dual-hosted git repository. jwest pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new d3dadcd Improve logging when mutation passed to commit log is too large d3dadcd is described below commit d3dadcd6f3bbde471e972f8332eb62de0f2d4aae Author: nvharikrishna <n.v.harikrishna.apa...@gmail.com> AuthorDate: Wed Jan 8 22:06:06 2020 +0530 Improve logging when mutation passed to commit log is too large Patch by Venkata Harikrishna Nukala; reviewed by Jordan West for CASSANDRA-14781 --- CHANGES.txt | 1 + src/java/org/apache/cassandra/batchlog/Batch.java | 4 +- .../org/apache/cassandra/db/CounterMutation.java | 39 +++++++++- src/java/org/apache/cassandra/db/IMutation.java | 13 ++++ src/java/org/apache/cassandra/db/Mutation.java | 36 +++++++++ .../db/MutationExceededMaxSizeException.java | 89 ++++++++++++++++++++++ .../apache/cassandra/db/commitlog/CommitLog.java | 14 +--- .../cassandra/db/virtual/VirtualMutation.java | 5 ++ src/java/org/apache/cassandra/hints/Hint.java | 2 +- .../apache/cassandra/schema/MigrationManager.java | 2 +- .../service/reads/repair/BlockingReadRepairs.java | 61 +++++++-------- .../cassandra/test/microbench/MutationBench.java | 2 +- .../db/MutationExceededMaxSizeExceptionTest.java | 46 +++++++++++ .../cassandra/db/commitlog/CommitLogTest.java | 55 +++++++++++-- 14 files changed, 316 insertions(+), 53 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 89c8d7d..8c4cf35 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0-alpha5 + * Improve logging when mutation passed to commit log is too large (CASSANDRA-14781) * replace LZ4FastDecompressor with LZ4SafeDecompressor (CASSANDRA-15560) * Fix buffer pool NPE with concurrent release due to in-progress tiny pool eviction (CASSANDRA-15726) * Avoid race condition when completing stream sessions (CASSANDRA-15666) diff --git a/src/java/org/apache/cassandra/batchlog/Batch.java b/src/java/org/apache/cassandra/batchlog/Batch.java index e91e3ca..fb6c5d5 100644 --- a/src/java/org/apache/cassandra/batchlog/Batch.java +++ b/src/java/org/apache/cassandra/batchlog/Batch.java @@ -90,7 +90,7 @@ public final class Batch size += sizeofUnsignedVInt(batch.decodedMutations.size()); for (Mutation mutation : batch.decodedMutations) { - int mutationSize = (int) Mutation.serializer.serializedSize(mutation, version); + int mutationSize = mutation.serializedSize(version); size += sizeofUnsignedVInt(mutationSize); size += mutationSize; } @@ -108,7 +108,7 @@ public final class Batch out.writeUnsignedVInt(batch.decodedMutations.size()); for (Mutation mutation : batch.decodedMutations) { - out.writeUnsignedVInt(Mutation.serializer.serializedSize(mutation, version)); + out.writeUnsignedVInt(mutation.serializedSize(version)); Mutation.serializer.serialize(mutation, out, version); } } diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java index bb10a6a..722ad73 100644 --- a/src/java/org/apache/cassandra/db/CounterMutation.java +++ b/src/java/org/apache/cassandra/db/CounterMutation.java @@ -45,6 +45,9 @@ import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.btree.BTreeSet; import static java.util.concurrent.TimeUnit.*; +import static org.apache.cassandra.net.MessagingService.VERSION_30; +import static org.apache.cassandra.net.MessagingService.VERSION_3014; +import static org.apache.cassandra.net.MessagingService.VERSION_40; public class CounterMutation implements IMutation { @@ -76,6 +79,15 @@ public class CounterMutation implements IMutation return mutation.getPartitionUpdates(); } + public void validateSize(int version, int overhead) + { + long totalSize = serializedSize(version) + overhead; + if(totalSize > MAX_MUTATION_SIZE) + { + throw new MutationExceededMaxSizeException(this, version, totalSize); + } + } + public Mutation getMutation() { return mutation; @@ -308,6 +320,31 @@ public class CounterMutation implements IMutation return DatabaseDescriptor.getCounterWriteRpcTimeout(unit); } + private int serializedSize30; + private int serializedSize3014; + private int serializedSize40; + + public int serializedSize(int version) + { + switch (version) + { + case VERSION_30: + if (serializedSize30 == 0) + serializedSize30 = (int) serializer.serializedSize(this, VERSION_30); + return serializedSize30; + case VERSION_3014: + if (serializedSize3014 == 0) + serializedSize3014 = (int) serializer.serializedSize(this, VERSION_3014); + return serializedSize3014; + case VERSION_40: + if (serializedSize40 == 0) + serializedSize40 = (int) serializer.serializedSize(this, VERSION_40); + return serializedSize40; + default: + throw new IllegalStateException("Unknown serialization version: " + version); + } + } + @Override public String toString() { @@ -336,7 +373,7 @@ public class CounterMutation implements IMutation public long serializedSize(CounterMutation cm, int version) { - return Mutation.serializer.serializedSize(cm.mutation, version) + return cm.mutation.serializedSize(version) + TypeSizes.sizeof(cm.consistency.name()); } } diff --git a/src/java/org/apache/cassandra/db/IMutation.java b/src/java/org/apache/cassandra/db/IMutation.java index 1710cfd..10472c1 100644 --- a/src/java/org/apache/cassandra/db/IMutation.java +++ b/src/java/org/apache/cassandra/db/IMutation.java @@ -20,11 +20,14 @@ package org.apache.cassandra.db; import java.util.Collection; import java.util.concurrent.TimeUnit; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.schema.TableId; public interface IMutation { + public long MAX_MUTATION_SIZE = DatabaseDescriptor.getMaxMutationSize(); + public void apply(); public String getKeyspaceName(); public Collection<TableId> getTableIds(); @@ -40,6 +43,16 @@ public interface IMutation } /** + * Validates size of mutation does not exceed {@link DatabaseDescriptor#getMaxMutationSize()}. + * + * @param version the MessagingService version the mutation is being serialized for. + * see {@link org.apache.cassandra.net.MessagingService#current_version} + * @param overhead overhadd to add for mutation size to validate. Pass zero if not required but not a negative value. + * @throws {@link MutationExceededMaxSizeException} if {@link DatabaseDescriptor#getMaxMutationSize()} is exceeded + */ + public void validateSize(int version, int overhead); + + /** * Computes the total data size of the specified mutations. * @param mutations the mutations * @return the total data size of the specified mutations diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java index 3d27ef3..16d20db 100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@ -38,6 +38,9 @@ import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.ByteBufferUtil; +import static org.apache.cassandra.net.MessagingService.VERSION_30; +import static org.apache.cassandra.net.MessagingService.VERSION_3014; +import static org.apache.cassandra.net.MessagingService.VERSION_40; import static org.apache.cassandra.utils.MonotonicClock.approxTime; public class Mutation implements IMutation @@ -119,6 +122,15 @@ public class Mutation implements IMutation return modifications.values(); } + public void validateSize(int version, int overhead) + { + long totalSize = serializedSize(version) + overhead; + if(totalSize > MAX_MUTATION_SIZE) + { + throw new MutationExceededMaxSizeException(this, version, totalSize); + } + } + public PartitionUpdate getPartitionUpdate(TableMetadata table) { return table == null ? null : modifications.get(table.id); @@ -256,6 +268,30 @@ public class Mutation implements IMutation } return buff.append("])").toString(); } + private int serializedSize30; + private int serializedSize3014; + private int serializedSize40; + + public int serializedSize(int version) + { + switch (version) + { + case VERSION_30: + if (serializedSize30 == 0) + serializedSize30 = (int) serializer.serializedSize(this, VERSION_30); + return serializedSize30; + case VERSION_3014: + if (serializedSize3014 == 0) + serializedSize3014 = (int) serializer.serializedSize(this, VERSION_3014); + return serializedSize3014; + case VERSION_40: + if (serializedSize40 == 0) + serializedSize40 = (int) serializer.serializedSize(this, VERSION_40); + return serializedSize40; + default: + throw new IllegalStateException("Unknown serialization version: " + version); + } + } /** * Creates a new simple mutuation builder. diff --git a/src/java/org/apache/cassandra/db/MutationExceededMaxSizeException.java b/src/java/org/apache/cassandra/db/MutationExceededMaxSizeException.java new file mode 100644 index 0000000..084c21e --- /dev/null +++ b/src/java/org/apache/cassandra/db/MutationExceededMaxSizeException.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.cassandra.db.partitions.PartitionUpdate; + +import static org.apache.cassandra.db.IMutation.MAX_MUTATION_SIZE; + +public class MutationExceededMaxSizeException extends RuntimeException +{ + public static final int PARTITION_MESSAGE_LIMIT = 1024; + + public final long mutationSize; + + MutationExceededMaxSizeException(IMutation mutation, int serializationVersion, long totalSize) + { + super(prepareMessage(mutation, serializationVersion, totalSize)); + this.mutationSize = totalSize; + } + + private static String prepareMessage(IMutation mutation, int version, long totalSize) + { + List<String> topPartitions = mutation.getPartitionUpdates().stream() + .sorted((upd1, upd2) -> + Long.compare(PartitionUpdate.serializer.serializedSize(upd2, version), + PartitionUpdate.serializer.serializedSize(upd1, version))) + .map(upd -> String.format("%s.%s", + upd.metadata().name, + upd.metadata().partitionKeyType.getString(upd.partitionKey().getKey()))) + .collect(Collectors.toList()); + + String topKeys = makeTopKeysString(topPartitions, PARTITION_MESSAGE_LIMIT); + return String.format("Encountered an oversized mutation (%d/%d) for keyspace: %s. Top keys are: %s", + totalSize, + MAX_MUTATION_SIZE, + mutation.getKeyspaceName(), + topKeys); + } + + @VisibleForTesting + static String makeTopKeysString(List<String> keys, int maxLength) { + Iterator<String> iterator = keys.listIterator(); + StringBuilder stringBuilder = new StringBuilder(); + while (iterator.hasNext()) + { + String key = iterator.next(); + + if (stringBuilder.length() == 0) + { + stringBuilder.append(key); //ensures atleast one key is added + iterator.remove(); + } + else if (stringBuilder.length() + key.length() + 2 <= maxLength) // 2 for ", " + { + stringBuilder.append(", ").append(key); + iterator.remove(); + } + else + break; + } + + if (keys.size() > 0) + stringBuilder.append(" and ").append(keys.size()).append(" more."); + + return stringBuilder.toString(); + } +} diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index c9e79cd..e7f8743 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -64,10 +64,6 @@ public class CommitLog implements CommitLogMBean public static final CommitLog instance = CommitLog.construct(); - // we only permit records HALF the size of a commit log, to ensure we don't spin allocating many mostly - // empty segments when writing large records - final long MAX_MUTATION_SIZE = DatabaseDescriptor.getMaxMutationSize(); - final public AbstractCommitLogSegmentManager segmentManager; public final CommitLogArchiver archiver; @@ -265,19 +261,13 @@ public class CommitLog implements CommitLogMBean { assert mutation != null; + mutation.validateSize(MessagingService.current_version, ENTRY_OVERHEAD_SIZE); + try (DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get()) { Mutation.serializer.serialize(mutation, dob, MessagingService.current_version); int size = dob.getLength(); - int totalSize = size + ENTRY_OVERHEAD_SIZE; - if (totalSize > MAX_MUTATION_SIZE) - { - throw new IllegalArgumentException(String.format("Mutation of %s is too large for the maximum size of %s", - FBUtilities.prettyPrintMemory(totalSize), - FBUtilities.prettyPrintMemory(MAX_MUTATION_SIZE))); - } - Allocation alloc = segmentManager.allocate(mutation, totalSize); CRC32 checksum = new CRC32(); diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java b/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java index 6db0acd..09ac4a6 100644 --- a/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java +++ b/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java @@ -109,4 +109,9 @@ public final class VirtualMutation implements IMutation { // no-op } + + public void validateSize(int version, int overhead) + { + // no-op + } } diff --git a/src/java/org/apache/cassandra/hints/Hint.java b/src/java/org/apache/cassandra/hints/Hint.java index 7e4618c..6c7c5d4 100644 --- a/src/java/org/apache/cassandra/hints/Hint.java +++ b/src/java/org/apache/cassandra/hints/Hint.java @@ -149,7 +149,7 @@ public final class Hint { long size = sizeof(hint.creationTime); size += sizeofUnsignedVInt(hint.gcgs); - size += Mutation.serializer.serializedSize(hint.mutation, version); + size += hint.mutation.serializedSize(version); return size; } diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java b/src/java/org/apache/cassandra/schema/MigrationManager.java index 257634c..4f91d94 100644 --- a/src/java/org/apache/cassandra/schema/MigrationManager.java +++ b/src/java/org/apache/cassandra/schema/MigrationManager.java @@ -487,7 +487,7 @@ public class MigrationManager { int size = TypeSizes.sizeof(schema.size()); for (Mutation mutation : schema) - size += Mutation.serializer.serializedSize(mutation, version); + size += mutation.serializedSize(version); return size; } } diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java index 3a4978e..68d1b4c 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java +++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java @@ -21,11 +21,11 @@ package org.apache.cassandra.service.reads.repair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.MutationExceededMaxSizeException; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.exceptions.ReadTimeoutException; import org.apache.cassandra.locator.InetAddressAndPort; @@ -33,6 +33,8 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.tracing.Tracing; +import static org.apache.cassandra.db.IMutation.MAX_MUTATION_SIZE; + public class BlockingReadRepairs { private static final Logger logger = LoggerFactory.getLogger(BlockingReadRepairs.class); @@ -51,43 +53,42 @@ public class BlockingReadRepairs DecoratedKey key = update.partitionKey(); Mutation mutation = new Mutation(update); - Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); - TableMetadata metadata = update.metadata(); - int messagingVersion = MessagingService.instance().versions.get(destination); - int mutationSize = (int) Mutation.serializer.serializedSize(mutation, messagingVersion); - int maxMutationSize = DatabaseDescriptor.getMaxMutationSize(); - - - if (mutationSize <= maxMutationSize) + try { + mutation.validateSize(messagingVersion, 0); return mutation; } - else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS) - { - logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}", - mutationSize, - maxMutationSize, - metadata, - metadata.partitionKeyType.getString(key.getKey()), - destination); - return null; - } - else + catch (MutationExceededMaxSizeException e) { - logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}", - mutationSize, - maxMutationSize, - metadata, - metadata.partitionKeyType.getString(key.getKey()), - destination); + Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); + TableMetadata metadata = update.metadata(); - if (!suppressException) + if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS) { - int blockFor = consistency.blockFor(keyspace); - Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor); - throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true); + logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}", + e.mutationSize, + MAX_MUTATION_SIZE, + metadata, + metadata.partitionKeyType.getString(key.getKey()), + destination); + } + else + { + logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}", + e.mutationSize, + MAX_MUTATION_SIZE, + metadata, + metadata.partitionKeyType.getString(key.getKey()), + destination); + + if (!suppressException) + { + int blockFor = consistency.blockFor(keyspace); + Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor); + throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true); + } } return null; } diff --git a/test/microbench/org/apache/cassandra/test/microbench/MutationBench.java b/test/microbench/org/apache/cassandra/test/microbench/MutationBench.java index 074e183..41d6aab 100644 --- a/test/microbench/org/apache/cassandra/test/microbench/MutationBench.java +++ b/test/microbench/org/apache/cassandra/test/microbench/MutationBench.java @@ -100,7 +100,7 @@ public class MutationBench Schema.instance.load(ksm.withSwapped(ksm.tables.with(metadata))); mutation = (Mutation)UpdateBuilder.create(metadata, 1L).newRow(1L).add("commentid", 32L).makeMutation(); - buffer = ByteBuffer.allocate((int) Mutation.serializer.serializedSize(mutation, MessagingService.current_version)); + buffer = ByteBuffer.allocate(mutation.serializedSize(MessagingService.current_version)); outputBuffer = new DataOutputBufferFixed(buffer); inputBuffer = new DataInputBuffer(buffer, false); diff --git a/test/unit/org/apache/cassandra/db/MutationExceededMaxSizeExceptionTest.java b/test/unit/org/apache/cassandra/db/MutationExceededMaxSizeExceptionTest.java new file mode 100644 index 0000000..81d9735 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/MutationExceededMaxSizeExceptionTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.junit.Test; + +import static org.apache.cassandra.db.MutationExceededMaxSizeException.makeTopKeysString; +import static org.junit.Assert.*; + +public class MutationExceededMaxSizeExceptionTest +{ + + @Test + public void testMakePKString() + { + List<String> keys = Arrays.asList("aaa", "bbb", "ccc"); + + assertEquals(0, makeTopKeysString(new ArrayList<>(), 1024).length()); + assertEquals("aaa and 2 more.", makeTopKeysString(new ArrayList<>(keys), 0)); + assertEquals("aaa and 2 more.", makeTopKeysString(new ArrayList<>(keys), 5)); + assertEquals("aaa, bbb, ccc", makeTopKeysString(new ArrayList<>(keys), 13)); + assertEquals("aaa, bbb, ccc", makeTopKeysString(new ArrayList<>(keys), 1024)); + assertEquals("aaa, bbb and 1 more.", makeTopKeysString(new ArrayList<>(keys), 8)); + assertEquals("aaa, bbb and 1 more.", makeTopKeysString(new ArrayList<>(keys), 10)); + } +} \ No newline at end of file diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java index 1e3f622..0e7f30d 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java @@ -69,9 +69,13 @@ import org.apache.cassandra.utils.KillerForTests; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.vint.VIntCoding; +import org.junit.After; + +import static org.apache.cassandra.db.commitlog.CommitLogSegment.ENTRY_OVERHEAD_SIZE; import static org.apache.cassandra.utils.ByteBufferUtil.bytes; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import org.apache.cassandra.db.marshal.IntegerType; @@ -420,7 +424,7 @@ public abstract class CommitLogTest max -= CommitLogSegment.ENTRY_OVERHEAD_SIZE; // log entry overhead // Note that the size of the value if vint encoded. So we first compute the ovehead of the mutation without the value and it's size - int mutationOverhead = (int)Mutation.serializer.serializedSize(rm, MessagingService.current_version) - (VIntCoding.computeVIntSize(allocSize) + allocSize); + int mutationOverhead = rm.serializedSize(MessagingService.current_version) - (VIntCoding.computeVIntSize(allocSize) + allocSize); max -= mutationOverhead; // Now, max is the max for both the value and it's size. But we want to know how much we can allocate, i.e. the size of the value. @@ -447,7 +451,7 @@ public abstract class CommitLogTest CommitLog.instance.add(rm); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = MutationExceededMaxSizeException.class) public void testExceedRecordLimit() throws Exception { Keyspace ks = Keyspace.open(KEYSPACE1); @@ -459,6 +463,47 @@ public abstract class CommitLogTest CommitLog.instance.add(rm); throw new AssertionError("mutation larger than limit was accepted"); } + @Test + public void testExceedRecordLimitWithMultiplePartitions() throws Exception + { + CommitLog.instance.resetUnsafe(true); + List<Mutation> mutations = new ArrayList<>(); + Keyspace ks = Keyspace.open(KEYSPACE1); + char[] keyChars = new char[MutationExceededMaxSizeException.PARTITION_MESSAGE_LIMIT]; + Arrays.fill(keyChars, 'k'); + String key = new String(keyChars); + + // large mutation + mutations.add(new RowUpdateBuilder(ks.getColumnFamilyStore(STANDARD1).metadata(), 0, key) + .clustering("bytes") + .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize())) + .build()); + + // smaller mutation + mutations.add(new RowUpdateBuilder(ks.getColumnFamilyStore(STANDARD2).metadata(), 0, key) + .clustering("bytes") + .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize() - 1024)) + .build()); + + Mutation mutation = Mutation.merge(mutations); + try + { + CommitLog.instance.add(Mutation.merge(mutations)); + throw new AssertionError("mutation larger than limit was accepted"); + } + catch (MutationExceededMaxSizeException exception) + { + String message = exception.getMessage(); + + long mutationSize = mutation.serializedSize(MessagingService.current_version) + ENTRY_OVERHEAD_SIZE; + final String expectedMessagePrefix = String.format("Encountered an oversized mutation (%d/%d) for keyspace: %s.", + mutationSize, + DatabaseDescriptor.getMaxMutationSize(), + KEYSPACE1); + assertTrue(message.startsWith(expectedMessagePrefix)); + assertTrue(message.contains(String.format("%s.%s and 1 more.", STANDARD1, key))); + } + } protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception { @@ -655,7 +700,7 @@ public abstract class CommitLogTest { DatabaseDescriptor.setAutoSnapshot(false); Keyspace notDurableKs = Keyspace.open(KEYSPACE2); - Assert.assertFalse(notDurableKs.getMetadata().params.durableWrites); + assertFalse(notDurableKs.getMetadata().params.durableWrites); ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1"); new RowUpdateBuilder(cfs.metadata(), 0, "key1") @@ -699,7 +744,7 @@ public abstract class CommitLogTest SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE, cfs.metadata()); List<String> activeSegments = CommitLog.instance.getActiveSegmentNames(); - Assert.assertFalse(activeSegments.isEmpty()); + assertFalse(activeSegments.isEmpty()); File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name)); replayer.replayFiles(files); @@ -736,7 +781,7 @@ public abstract class CommitLogTest SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, commitLogPosition, cfs.metadata()); List<String> activeSegments = CommitLog.instance.getActiveSegmentNames(); - Assert.assertFalse(activeSegments.isEmpty()); + assertFalse(activeSegments.isEmpty()); File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name)); replayer.replayFiles(files); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org