This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 39eb7db65fd45653fdece1087ba75c3356a10c97 Author: Sam Tunnicliffe <s...@beobal.com> AuthorDate: Tue Dec 17 17:56:18 2019 +0000 Exclude legacy counters from repaired data digest Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for CASSANDRA-15461 --- CHANGES.txt | 1 + .../cassandra/db/AbstractClusteringPrefix.java | 10 +- .../org/apache/cassandra/db/ClusteringPrefix.java | 8 +- src/java/org/apache/cassandra/db/Columns.java | 6 +- src/java/org/apache/cassandra/db/DeletionTime.java | 6 +- src/java/org/apache/cassandra/db/Digest.java | 200 +++++++++++++++++++++ src/java/org/apache/cassandra/db/LivenessInfo.java | 17 +- src/java/org/apache/cassandra/db/ReadCommand.java | 14 +- src/java/org/apache/cassandra/db/ReadResponse.java | 8 +- .../cassandra/db/context/CounterContext.java | 20 +-- .../db/partitions/PartitionIterators.java | 1 - .../partitions/UnfilteredPartitionIterators.java | 8 +- .../org/apache/cassandra/db/rows/AbstractCell.java | 24 +-- .../org/apache/cassandra/db/rows/AbstractRow.java | 15 +- .../org/apache/cassandra/db/rows/CellPath.java | 8 +- .../org/apache/cassandra/db/rows/ColumnData.java | 7 +- .../cassandra/db/rows/ComplexColumnData.java | 8 +- .../db/rows/RangeTombstoneBoundMarker.java | 8 +- .../db/rows/RangeTombstoneBoundaryMarker.java | 10 +- src/java/org/apache/cassandra/db/rows/Row.java | 8 +- .../org/apache/cassandra/db/rows/RowIterators.java | 17 +- .../org/apache/cassandra/db/rows/Unfiltered.java | 9 +- .../cassandra/db/rows/UnfilteredRowIterators.java | 20 +-- .../apache/cassandra/dht/RandomPartitioner.java | 3 +- .../org/apache/cassandra/repair/Validator.java | 165 +---------------- .../apache/cassandra/schema/SchemaConstants.java | 4 +- .../apache/cassandra/schema/SchemaKeyspace.java | 9 +- .../org/apache/cassandra/utils/FBUtilities.java | 14 ++ .../org/apache/cassandra/utils/HashingUtils.java | 109 ----------- src/java/org/apache/cassandra/utils/MD5Digest.java | 2 +- src/java/org/apache/cassandra/utils/UUIDGen.java | 24 ++- .../apache/cassandra/cache/CacheProviderTest.java | 15 +- .../org/apache/cassandra/db/CounterCellTest.java | 17 +- test/unit/org/apache/cassandra/db/DigestTest.java | 122 +++++++++++++ .../org/apache/cassandra/db/PartitionTest.java | 37 ++-- .../org/apache/cassandra/db/ReadCommandTest.java | 88 ++++++++- .../org/apache/cassandra/db/RowUpdateBuilder.java | 10 ++ .../org/apache/cassandra/repair/ValidatorTest.java | 47 +---- .../repair/asymmetric/DifferenceHolderTest.java | 6 +- .../apache/cassandra/utils/HashingUtilsTest.java | 92 ---------- .../org/apache/cassandra/utils/MerkleTreeTest.java | 7 +- .../apache/cassandra/utils/MerkleTreesTest.java | 6 +- 42 files changed, 598 insertions(+), 612 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 85ddd53..3d5217a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0-alpha3 + * Exclude legacy counter shards from repaired data tracking (CASSANDRA-15461) * Make it easier to add trace headers to messages (CASSANDRA-15499) * Fix and optimise partial compressed sstable streaming (CASSANDRA-13938) * Improve error when JVM 11 can't access required modules (CASSANDRA-15468) diff --git a/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java b/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java index 884a091..8714936 100644 --- a/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java +++ b/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java @@ -20,10 +20,6 @@ package org.apache.cassandra.db; import java.nio.ByteBuffer; import java.util.Objects; -import com.google.common.hash.Hasher; - -import org.apache.cassandra.utils.HashingUtils; - public abstract class AbstractClusteringPrefix implements ClusteringPrefix { public ClusteringPrefix clustering() @@ -42,15 +38,15 @@ public abstract class AbstractClusteringPrefix implements ClusteringPrefix return size; } - public void digest(Hasher hasher) + public void digest(Digest digest) { for (int i = 0; i < size(); i++) { ByteBuffer bb = get(i); if (bb != null) - HashingUtils.updateBytes(hasher, bb.duplicate()); + digest.update(bb); } - HashingUtils.updateWithByte(hasher, kind().ordinal()); + digest.updateWithByte(kind().ordinal()); } @Override diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java index 2d9198b..357d746 100644 --- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java +++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java @@ -21,8 +21,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; -import com.google.common.hash.Hasher; - import org.apache.cassandra.cache.IMeasurableMemory; import org.apache.cassandra.config.*; import org.apache.cassandra.db.marshal.CompositeType; @@ -218,11 +216,11 @@ public interface ClusteringPrefix extends IMeasurableMemory, Clusterable public ByteBuffer get(int i); /** - * Adds the data of this clustering prefix to the provided Hasher instance. + * Adds the data of this clustering prefix to the provided Digest instance. * - * @param hasher the Hasher instance to which to add this prefix. + * @param digest the Digest instance to which to add this prefix. */ - public void digest(Hasher hasher); + public void digest(Digest digest); /** * The size of the data hold by this prefix. diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java index bf9e174..f56072e 100644 --- a/src/java/org/apache/cassandra/db/Columns.java +++ b/src/java/org/apache/cassandra/db/Columns.java @@ -25,7 +25,6 @@ import java.nio.ByteBuffer; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; -import com.google.common.hash.Hasher; import net.nicoulaj.compilecommand.annotations.DontInline; import org.apache.cassandra.exceptions.UnknownColumnException; @@ -37,7 +36,6 @@ import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.HashingUtils; import org.apache.cassandra.utils.SearchIterator; import org.apache.cassandra.utils.btree.BTree; import org.apache.cassandra.utils.btree.BTreeSearchIterator; @@ -375,10 +373,10 @@ public class Columns extends AbstractCollection<ColumnMetadata> implements Colle return column -> iter.next(column) != null; } - public void digest(Hasher hasher) + public void digest(Digest digest) { for (ColumnMetadata c : this) - HashingUtils.updateBytes(hasher, c.name.bytes.duplicate()); + digest.update(c.name.bytes); } /** diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java index 14e846d..b2d9343 100644 --- a/src/java/org/apache/cassandra/db/DeletionTime.java +++ b/src/java/org/apache/cassandra/db/DeletionTime.java @@ -20,14 +20,12 @@ package org.apache.cassandra.db; import java.io.IOException; import com.google.common.base.Objects; -import com.google.common.hash.Hasher; import org.apache.cassandra.cache.IMeasurableMemory; import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.io.ISerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.utils.HashingUtils; import org.apache.cassandra.utils.ObjectSizes; /** @@ -80,12 +78,12 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory return markedForDeleteAt() == Long.MIN_VALUE && localDeletionTime() == Integer.MAX_VALUE; } - public void digest(Hasher hasher) + public void digest(Digest digest) { // localDeletionTime is basically a metadata of the deletion time that tells us when it's ok to purge it. // It's thus intrinsically a local information and shouldn't be part of the digest (which exists for // cross-nodes comparisons). - HashingUtils.updateWithLong(hasher, markedForDeleteAt()); + digest.updateWithLong(markedForDeleteAt()); } /** diff --git a/src/java/org/apache/cassandra/db/Digest.java b/src/java/org/apache/cassandra/db/Digest.java new file mode 100644 index 0000000..bac6386 --- /dev/null +++ b/src/java/org/apache/cassandra/db/Digest.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; + +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; + +import org.apache.cassandra.db.context.CounterContext; +import org.apache.cassandra.utils.FastByteOperations; + +public class Digest +{ + private static final ThreadLocal<byte[]> localBuffer = ThreadLocal.withInitial(() -> new byte[4096]); + + private final Hasher hasher; + private long inputBytes = 0; + + @SuppressWarnings("deprecation") + private static Hasher md5() + { + return Hashing.md5().newHasher(); + } + + public static Digest forReadResponse() + { + return new Digest(md5()); + } + + public static Digest forSchema() + { + return new Digest(md5()); + } + + public static Digest forValidator() + { + // Uses a Hasher that concatenates the hash code from 2 hash functions + // (murmur3_128) with different seeds to produce a 256 bit hashcode + return new Digest(Hashing.concatenating(Hashing.murmur3_128(1000), + Hashing.murmur3_128(2000)) + .newHasher()); + } + + public static Digest forRepairedDataTracking() + { + return new Digest(Hashing.crc32c().newHasher()) + { + @Override + public Digest updateWithCounterContext(ByteBuffer context) + { + // for the purposes of repaired data tracking on the read path, exclude + // contexts with legacy shards as these may be irrevocably different on + // different replicas + if (CounterContext.instance().hasLegacyShards(context)) + return this; + + return super.updateWithCounterContext(context); + } + }; + } + + Digest(Hasher hasher) + { + this.hasher = hasher; + } + + public Digest update(byte[] input, int offset, int len) + { + hasher.putBytes(input, offset, len); + inputBytes += len; + return this; + } + + /** + * Update the digest with the bytes from the supplied buffer. This does + * not modify the position of the supplied buffer, so callers are not + * required to duplicate() the source buffer before calling + */ + public Digest update(ByteBuffer input) + { + return update(input, input.position(), input.remaining()); + } + + /** + * Update the digest with the bytes sliced from the supplied buffer. This does + * not modify the position of the supplied buffer, so callers are not + * required to duplicate() the source buffer before calling + */ + private Digest update(ByteBuffer input, int pos, int len) + { + if (len <= 0) + return this; + + if (input.hasArray()) + { + byte[] b = input.array(); + int ofs = input.arrayOffset(); + hasher.putBytes(b, ofs + pos, len); + inputBytes += len; + } + else + { + byte[] tempArray = localBuffer.get(); + while (len > 0) + { + int chunk = Math.min(len, tempArray.length); + FastByteOperations.copy(input, pos, tempArray, 0, chunk); + hasher.putBytes(tempArray, 0, chunk); + len -= chunk; + pos += chunk; + inputBytes += chunk; + } + } + return this; + } + + /** + * Update the digest with the content of a counter context. + * Note that this skips the header entirely since the header information + * has local meaning only, while digests are meant for comparison across + * nodes. This means in particular that we always have: + * updateDigest(ctx) == updateDigest(clearAllLocal(ctx)) + */ + public Digest updateWithCounterContext(ByteBuffer context) + { + // context can be empty due to the optimization from CASSANDRA-10657 + if (!context.hasRemaining()) + return this; + + int pos = context.position() + CounterContext.headerLength(context); + int len = context.limit() - pos; + update(context, pos, len); + return this; + } + + public Digest updateWithByte(int val) + { + hasher.putByte((byte) (val & 0xFF)); + inputBytes++; + return this; + } + + public Digest updateWithInt(int val) + { + hasher.putByte((byte) ((val >>> 24) & 0xFF)); + hasher.putByte((byte) ((val >>> 16) & 0xFF)); + hasher.putByte((byte) ((val >>> 8) & 0xFF)); + hasher.putByte((byte) ((val >>> 0) & 0xFF)); + inputBytes += 4; + return this; + } + + public Digest updateWithLong(long val) + { + hasher.putByte((byte) ((val >>> 56) & 0xFF)); + hasher.putByte((byte) ((val >>> 48) & 0xFF)); + hasher.putByte((byte) ((val >>> 40) & 0xFF)); + hasher.putByte((byte) ((val >>> 32) & 0xFF)); + hasher.putByte((byte) ((val >>> 24) & 0xFF)); + hasher.putByte((byte) ((val >>> 16) & 0xFF)); + hasher.putByte((byte) ((val >>> 8) & 0xFF)); + hasher.putByte((byte) ((val >>> 0) & 0xFF)); + inputBytes += 8; + return this; + } + + public Digest updateWithBoolean(boolean val) + { + updateWithByte(val ? 0 : 1); + return this; + } + + public byte[] digest() + { + return hasher.hash().asBytes(); + } + + public long inputBytes() + { + return inputBytes; + } +} + diff --git a/src/java/org/apache/cassandra/db/LivenessInfo.java b/src/java/org/apache/cassandra/db/LivenessInfo.java index 1340c00..b1ea3f6 100644 --- a/src/java/org/apache/cassandra/db/LivenessInfo.java +++ b/src/java/org/apache/cassandra/db/LivenessInfo.java @@ -19,11 +19,8 @@ package org.apache.cassandra.db; import java.util.Objects; -import com.google.common.hash.Hasher; - import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.serializers.MarshalException; -import org.apache.cassandra.utils.HashingUtils; /** * Stores the information relating to the liveness of the primary key columns of a row. @@ -153,11 +150,11 @@ public class LivenessInfo /** * Adds this liveness information to the provided digest. * - * @param hasher the hasher digest to add this liveness information to. + * @param digest the digest to add this liveness information to. */ - public void digest(Hasher hasher) + public void digest(Digest digest) { - HashingUtils.updateWithLong(hasher, timestamp()); + digest.updateWithLong(timestamp()); } /** @@ -331,11 +328,11 @@ public class LivenessInfo } @Override - public void digest(Hasher hasher) + public void digest(Digest digest) { - super.digest(hasher); - HashingUtils.updateWithInt(hasher, localExpirationTime); - HashingUtils.updateWithInt(hasher, ttl); + super.digest(digest); + digest.updateWithInt(localExpirationTime) + .updateWithInt(ttl); } @Override diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 68ce2ea..9485abc 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -29,8 +29,6 @@ import javax.annotation.Nullable; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; -import com.google.common.hash.Hasher; -import com.google.common.hash.Hashing; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +36,6 @@ import org.apache.cassandra.config.*; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.net.MessageFlag; import org.apache.cassandra.net.Verb; -import org.apache.cassandra.utils.ApproximateTime; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.transform.RTBoundCloser; @@ -67,7 +64,6 @@ import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.HashingUtils; import static com.google.common.collect.Iterables.any; import static com.google.common.collect.Iterables.filter; @@ -782,14 +778,14 @@ public abstract class ReadCommand extends AbstractReadQuery private static class RepairedDataInfo { - private Hasher hasher; + private Digest hasher; private boolean isConclusive = true; ByteBuffer getDigest() { return hasher == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER - : ByteBuffer.wrap(getHasher().hash().asBytes()); + : ByteBuffer.wrap(getHasher().digest()); } boolean isConclusive() @@ -804,7 +800,7 @@ public abstract class ReadCommand extends AbstractReadQuery void trackPartitionKey(DecoratedKey key) { - HashingUtils.updateBytes(getHasher(), key.getKey().duplicate()); + getHasher().update(key.getKey()); } void trackDeletion(DeletionTime deletion) @@ -822,10 +818,10 @@ public abstract class ReadCommand extends AbstractReadQuery row.digest(getHasher()); } - private Hasher getHasher() + private Digest getHasher() { if (hasher == null) - hasher = Hashing.crc32c().newHasher(); + hasher = Digest.forRepairedDataTracking(); return hasher; } diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java index 2ddb6a7..affbbbe 100644 --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@ -21,7 +21,6 @@ import java.io.*; import java.nio.ByteBuffer; import com.google.common.annotations.VisibleForTesting; -import com.google.common.hash.Hasher; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.partitions.*; @@ -34,7 +33,6 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.HashingUtils; public abstract class ReadResponse { @@ -124,9 +122,9 @@ public abstract class ReadResponse protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator, ReadCommand command) { - Hasher hasher = HashingUtils.CURRENT_HASH_FUNCTION.newHasher(); - UnfilteredPartitionIterators.digest(iterator, hasher, command.digestVersion()); - return ByteBuffer.wrap(hasher.hash().asBytes()); + Digest digest = Digest.forReadResponse(); + UnfilteredPartitionIterators.digest(iterator, digest, command.digestVersion()); + return ByteBuffer.wrap(digest.digest()); } private static class DigestResponse extends ReadResponse diff --git a/src/java/org/apache/cassandra/db/context/CounterContext.java b/src/java/org/apache/cassandra/db/context/CounterContext.java index 01c2f1d..6a618ca 100644 --- a/src/java/org/apache/cassandra/db/context/CounterContext.java +++ b/src/java/org/apache/cassandra/db/context/CounterContext.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.List; import com.google.common.annotations.VisibleForTesting; -import com.google.common.hash.Hasher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -168,7 +167,7 @@ public class CounterContext return state.context; } - private static int headerLength(ByteBuffer context) + public static int headerLength(ByteBuffer context) { return HEADER_SIZE_LENGTH + Math.abs(context.getShort(context.position())) * HEADER_ELT_LENGTH; } @@ -684,23 +683,6 @@ public class CounterContext } /** - * Update a {@link Hasher} with the content of a context. - * Note that this skips the header entirely since the header information - * has local meaning only, while digests are meant for comparison across - * nodes. This means in particular that we always have: - * updateDigest(ctx) == updateDigest(clearAllLocal(ctx)) - */ - public void updateDigest(Hasher hasher, ByteBuffer context) - { - // context can be empty due to the optimization from CASSANDRA-10657 - if (!context.hasRemaining()) - return; - ByteBuffer dup = context.duplicate(); - dup.position(context.position() + headerLength(context)); - HashingUtils.updateBytes(hasher, dup); - } - - /** * Returns the clock and the count associated with the local counter id, or (0, 0) if no such shard is present. */ public ClockAndCount getLocalClockAndCount(ByteBuffer context) diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java index 74a61d6..874db05 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java @@ -20,7 +20,6 @@ package org.apache.cassandra.db.partitions; import java.util.*; import org.apache.cassandra.db.EmptyIterators; -import org.apache.cassandra.db.transform.FilteredPartitions; import org.apache.cassandra.db.transform.MorePartitions; import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.utils.AbstractIterator; diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java index 7990474..945bcb4 100644 --- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java +++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java @@ -21,8 +21,6 @@ import java.io.IOError; import java.io.IOException; import java.util.*; -import com.google.common.hash.Hasher; - import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.rows.*; @@ -259,16 +257,16 @@ public abstract class UnfilteredPartitionIterators * Caller must close the provided iterator. * * @param iterator the iterator to digest. - * @param hasher the {@link Hasher} to use for the digest. + * @param digest the {@link Digest} to use. * @param version the messaging protocol to use when producing the digest. */ - public static void digest(UnfilteredPartitionIterator iterator, Hasher hasher, int version) + public static void digest(UnfilteredPartitionIterator iterator, Digest digest, int version) { while (iterator.hasNext()) { try (UnfilteredRowIterator partition = iterator.next()) { - UnfilteredRowIterators.digest(partition, hasher, version); + UnfilteredRowIterators.digest(partition, digest, version); } } } diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java b/src/java/org/apache/cassandra/db/rows/AbstractCell.java index bfe7396..51c9ff4 100644 --- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java +++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java @@ -20,16 +20,14 @@ package org.apache.cassandra.db.rows; import java.nio.ByteBuffer; import java.util.Objects; -import com.google.common.hash.Hasher; - -import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.db.Digest; import org.apache.cassandra.db.DeletionPurger; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.CollectionType; +import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.serializers.MarshalException; -import org.apache.cassandra.utils.HashingUtils; import org.apache.cassandra.utils.memory.AbstractAllocator; /** @@ -120,22 +118,18 @@ public abstract class AbstractCell extends Cell + (path == null ? 0 : path.dataSize()); } - public void digest(Hasher hasher) + public void digest(Digest digest) { if (isCounterCell()) - { - CounterContext.instance().updateDigest(hasher, value()); - } + digest.updateWithCounterContext(value()); else - { - HashingUtils.updateBytes(hasher, value().duplicate()); - } + digest.update(value()); - HashingUtils.updateWithLong(hasher, timestamp()); - HashingUtils.updateWithInt(hasher, ttl()); - HashingUtils.updateWithBoolean(hasher, isCounterCell()); + digest.updateWithLong(timestamp()) + .updateWithInt(ttl()) + .updateWithBoolean(isCounterCell()); if (path() != null) - path().digest(hasher); + path().digest(digest); } public void validate() diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java b/src/java/org/apache/cassandra/db/rows/AbstractRow.java index f719db5..2018d4e 100644 --- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java +++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java @@ -17,21 +17,18 @@ package org.apache.cassandra.db.rows; import java.nio.ByteBuffer; -import java.util.AbstractCollection; import java.util.Objects; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.StreamSupport; import com.google.common.collect.Iterables; -import com.google.common.hash.Hasher; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.db.*; import org.apache.cassandra.db.marshal.CollectionType; import org.apache.cassandra.db.marshal.UserType; import org.apache.cassandra.serializers.MarshalException; -import org.apache.cassandra.utils.HashingUtils; /** * Base abstract class for {@code Row} implementations. @@ -61,16 +58,16 @@ public abstract class AbstractRow implements Row return clustering() == Clustering.STATIC_CLUSTERING; } - public void digest(Hasher hasher) + public void digest(Digest digest) { - HashingUtils.updateWithByte(hasher, kind().ordinal()); - clustering().digest(hasher); + digest.updateWithByte(kind().ordinal()); + clustering().digest(digest); - deletion().digest(hasher); - primaryKeyLivenessInfo().digest(hasher); + deletion().digest(digest); + primaryKeyLivenessInfo().digest(digest); for (ColumnData cd : this) - cd.digest(hasher); + cd.digest(digest); } public void validateData(TableMetadata metadata) diff --git a/src/java/org/apache/cassandra/db/rows/CellPath.java b/src/java/org/apache/cassandra/db/rows/CellPath.java index 94fa8e7..1bf8b8f 100644 --- a/src/java/org/apache/cassandra/db/rows/CellPath.java +++ b/src/java/org/apache/cassandra/db/rows/CellPath.java @@ -21,12 +21,10 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Objects; -import com.google.common.hash.Hasher; - +import org.apache.cassandra.db.Digest; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.HashingUtils; import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.memory.AbstractAllocator; @@ -56,10 +54,10 @@ public abstract class CellPath return size; } - public void digest(Hasher hasher) + public void digest(Digest digest) { for (int i = 0; i < size(); i++) - HashingUtils.updateBytes(hasher, get(i).duplicate()); + digest.update(get(i)); } public abstract CellPath copy(AbstractAllocator allocator); diff --git a/src/java/org/apache/cassandra/db/rows/ColumnData.java b/src/java/org/apache/cassandra/db/rows/ColumnData.java index f2da132..e5f5550 100644 --- a/src/java/org/apache/cassandra/db/rows/ColumnData.java +++ b/src/java/org/apache/cassandra/db/rows/ColumnData.java @@ -19,8 +19,7 @@ package org.apache.cassandra.db.rows; import java.util.Comparator; -import com.google.common.hash.Hasher; - +import org.apache.cassandra.db.Digest; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.db.DeletionPurger; import org.apache.cassandra.db.partitions.PartitionUpdate; @@ -75,9 +74,9 @@ public abstract class ColumnData /** * Adds the data to the provided digest. * - * @param hasher the {@link Hasher} to add the data to. + * @param digest the {@link Digest} to add the data to. */ - public abstract void digest(Hasher hasher); + public abstract void digest(Digest digest); /** * Returns a copy of the data where all timestamps for live data have replaced by {@code newTimestamp} and diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java index aa1150c..832167f 100644 --- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java +++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java @@ -22,10 +22,10 @@ import java.util.Iterator; import java.util.Objects; import com.google.common.base.Function; -import com.google.common.hash.Hasher; import org.apache.cassandra.db.DeletionPurger; import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.Digest; import org.apache.cassandra.db.LivenessInfo; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.marshal.ByteType; @@ -129,13 +129,13 @@ public class ComplexColumnData extends ColumnData implements Iterable<Cell> cell.validate(); } - public void digest(Hasher hasher) + public void digest(Digest digest) { if (!complexDeletion.isLive()) - complexDeletion.digest(hasher); + complexDeletion.digest(digest); for (Cell cell : this) - cell.digest(hasher); + cell.digest(digest); } public boolean hasInvalidDeletions() diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java index 094cf72..51d8264 100644 --- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java +++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java @@ -20,8 +20,6 @@ package org.apache.cassandra.db.rows; import java.nio.ByteBuffer; import java.util.Objects; -import com.google.common.hash.Hasher; - import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.db.*; import org.apache.cassandra.utils.memory.AbstractAllocator; @@ -132,10 +130,10 @@ public class RangeTombstoneBoundMarker extends AbstractRangeTombstoneMarker<Clus return new RangeTombstoneBoundMarker(clustering(), newDeletionTime); } - public void digest(Hasher hasher) + public void digest(Digest digest) { - bound.digest(hasher); - deletion.digest(hasher); + bound.digest(digest); + deletion.digest(digest); } public String toString(TableMetadata metadata) diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java index 1cbf31c..6a931c9 100644 --- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java +++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java @@ -20,8 +20,6 @@ package org.apache.cassandra.db.rows; import java.nio.ByteBuffer; import java.util.Objects; -import com.google.common.hash.Hasher; - import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.db.*; import org.apache.cassandra.utils.memory.AbstractAllocator; @@ -150,11 +148,11 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker<C return new RangeTombstoneBoundMarker(openBound(reversed), openDeletionTime(reversed)); } - public void digest(Hasher hasher) + public void digest(Digest digest) { - bound.digest(hasher); - endDeletion.digest(hasher); - startDeletion.digest(hasher); + bound.digest(digest); + endDeletion.digest(digest); + startDeletion.digest(digest); } public String toString(TableMetadata metadata) diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java index 0174adc..6f0b43e 100644 --- a/src/java/org/apache/cassandra/db/rows/Row.java +++ b/src/java/org/apache/cassandra/db/rows/Row.java @@ -21,14 +21,12 @@ import java.util.*; import java.util.function.Consumer; import com.google.common.base.Predicate; -import com.google.common.hash.Hasher; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.paxos.Commit; -import org.apache.cassandra.utils.HashingUtils; import org.apache.cassandra.utils.MergeIterator; import org.apache.cassandra.utils.SearchIterator; import org.apache.cassandra.utils.btree.BTree; @@ -391,10 +389,10 @@ public interface Row extends Unfiltered, Iterable<ColumnData> return time.deletes(cell); } - public void digest(Hasher hasher) + public void digest(Digest digest) { - time.digest(hasher); - HashingUtils.updateWithBoolean(hasher, isShadowable); + time.digest(digest); + digest.updateWithBoolean(isShadowable); } public int dataSize() diff --git a/src/java/org/apache/cassandra/db/rows/RowIterators.java b/src/java/org/apache/cassandra/db/rows/RowIterators.java index d340777..640cbc8 100644 --- a/src/java/org/apache/cassandra/db/rows/RowIterators.java +++ b/src/java/org/apache/cassandra/db/rows/RowIterators.java @@ -17,14 +17,13 @@ */ package org.apache.cassandra.db.rows; -import com.google.common.hash.Hasher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.db.Digest; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.utils.HashingUtils; /** * Static methods to work with row iterators. @@ -35,20 +34,20 @@ public abstract class RowIterators private RowIterators() {} - public static void digest(RowIterator iterator, Hasher hasher) + public static void digest(RowIterator iterator, Digest digest) { // TODO: we're not computing digest the same way that old nodes. This is // currently ok as this is only used for schema digest and the is no exchange // of schema digest between different versions. If this changes however, // we'll need to agree on a version. - HashingUtils.updateBytes(hasher, iterator.partitionKey().getKey().duplicate()); - iterator.columns().regulars.digest(hasher); - iterator.columns().statics.digest(hasher); - HashingUtils.updateWithBoolean(hasher, iterator.isReverseOrder()); - iterator.staticRow().digest(hasher); + digest.update(iterator.partitionKey().getKey()); + iterator.columns().regulars.digest(digest); + iterator.columns().statics.digest(digest); + digest.updateWithBoolean(iterator.isReverseOrder()); + iterator.staticRow().digest(digest); while (iterator.hasNext()) - iterator.next().digest(hasher); + iterator.next().digest(digest); } /** diff --git a/src/java/org/apache/cassandra/db/rows/Unfiltered.java b/src/java/org/apache/cassandra/db/rows/Unfiltered.java index 81b63b7..f5c5ed0 100644 --- a/src/java/org/apache/cassandra/db/rows/Unfiltered.java +++ b/src/java/org/apache/cassandra/db/rows/Unfiltered.java @@ -17,8 +17,7 @@ */ package org.apache.cassandra.db.rows; -import com.google.common.hash.Hasher; - +import org.apache.cassandra.db.Digest; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.db.Clusterable; @@ -39,11 +38,11 @@ public interface Unfiltered extends Clusterable public Kind kind(); /** - * Digest the atom using the provided {@link Hasher}. + * Digest the atom using the provided {@link Digest}. * - * @param hasher the {@see Hasher} to use. + * @param digest the {@see Digest} to use. */ - public void digest(Hasher hasher); + public void digest(Digest digest); /** * Validate the data of this atom. diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java index 21e1954..b0af16d 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java @@ -19,7 +19,6 @@ package org.apache.cassandra.db.rows; import java.util.*; -import com.google.common.hash.Hasher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,7 +32,6 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.HashingUtils; import org.apache.cassandra.utils.IMergeIterator; import org.apache.cassandra.utils.MergeIterator; @@ -181,14 +179,14 @@ public abstract class UnfilteredRowIterators * Digests the partition represented by the provided iterator. * * @param iterator the iterator to digest. - * @param hasher the {@link Hasher} to use for the digest. + * @param digest the {@link Digest} to use. * @param version the messaging protocol to use when producing the digest. */ - public static void digest(UnfilteredRowIterator iterator, Hasher hasher, int version) + public static void digest(UnfilteredRowIterator iterator, Digest digest, int version) { - HashingUtils.updateBytes(hasher, iterator.partitionKey().getKey().duplicate()); - iterator.partitionLevelDeletion().digest(hasher); - iterator.columns().regulars.digest(hasher); + digest.update(iterator.partitionKey().getKey()); + iterator.partitionLevelDeletion().digest(digest); + iterator.columns().regulars.digest(digest); // When serializing an iterator, we skip the static columns if the iterator has not static row, even if the // columns() object itself has some (the columns() is a superset of what the iterator actually contains, and // will correspond to the queried columns pre-serialization). So we must avoid taking the satic column names @@ -200,14 +198,14 @@ public abstract class UnfilteredRowIterators // different), but removing them entirely is stricly speaking a breaking change (it would create mismatches on // upgrade) so we can only do on the next protocol version bump. if (iterator.staticRow() != Rows.EMPTY_STATIC_ROW) - iterator.columns().statics.digest(hasher); - HashingUtils.updateWithBoolean(hasher, iterator.isReverseOrder()); - iterator.staticRow().digest(hasher); + iterator.columns().statics.digest(digest); + digest.updateWithBoolean(iterator.isReverseOrder()); + iterator.staticRow().digest(digest); while (iterator.hasNext()) { Unfiltered unfiltered = iterator.next(); - unfiltered.digest(hasher); + unfiltered.digest(digest); } } diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java b/src/java/org/apache/cassandra/dht/RandomPartitioner.java index 0457a89..241b785 100644 --- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java +++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java @@ -36,7 +36,6 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.GuidGenerator; -import org.apache.cassandra.utils.HashingUtils; import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.Pair; @@ -61,7 +60,7 @@ public class RandomPartitioner implements IPartitioner @Override protected MessageDigest initialValue() { - return HashingUtils.newMessageDigest("MD5"); + return FBUtilities.newMessageDigest("MD5"); } @Override diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java index 341c5b3..2f71729 100644 --- a/src/java/org/apache/cassandra/repair/Validator.java +++ b/src/java/org/apache/cassandra/repair/Validator.java @@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Digest; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.dht.Range; @@ -195,171 +196,15 @@ public class Validator implements Runnable return range.contains(t); } - /** - * Hasher that concatenates the hash code from 2 hash functions (murmur3_128) with different - * seeds and counts the number of bytes we hashed. - * - * Everything hashed by this class is hashed by both hash functions and the - * resulting hashcode is a concatenation of the output bytes from each. - * - * Idea from Guavas Hashing.ConcatenatedHashFunction, but that is package-private so we can't use it - */ - @VisibleForTesting - static class CountingHasher implements Hasher - { - @VisibleForTesting - static final HashFunction[] hashFunctions = new HashFunction[2]; - - static - { - for (int i = 0; i < hashFunctions.length; i++) - hashFunctions[i] = Hashing.murmur3_128(i * 1000); - } - private long count; - private final int bits; - private final Hasher[] underlying = new Hasher[2]; - - CountingHasher() - { - int bits = 0; - for (int i = 0; i < underlying.length; i++) - { - this.underlying[i] = hashFunctions[i].newHasher(); - bits += hashFunctions[i].bits(); - } - this.bits = bits; - } - - public Hasher putByte(byte b) - { - count += 1; - for (Hasher h : underlying) - h.putByte(b); - return this; - } - - public Hasher putBytes(byte[] bytes) - { - count += bytes.length; - for (Hasher h : underlying) - h.putBytes(bytes); - return this; - } - - public Hasher putBytes(byte[] bytes, int offset, int length) - { - count += length; - for (Hasher h : underlying) - h.putBytes(bytes, offset, length); - return this; - } - - public Hasher putBytes(ByteBuffer byteBuffer) - { - count += byteBuffer.remaining(); - for (Hasher h : underlying) - h.putBytes(byteBuffer.duplicate()); - return this; - } - - public Hasher putShort(short i) - { - count += Short.BYTES; - for (Hasher h : underlying) - h.putShort(i); - return this; - } - - public Hasher putInt(int i) - { - count += Integer.BYTES; - for (Hasher h : underlying) - h.putInt(i); - return this; - } - - public Hasher putLong(long l) - { - count += Long.BYTES; - for (Hasher h : underlying) - h.putLong(l); - return this; - } - - public Hasher putFloat(float v) - { - count += Float.BYTES; - for (Hasher h : underlying) - h.putFloat(v); - return this; - } - - public Hasher putDouble(double v) - { - count += Double.BYTES; - for (Hasher h : underlying) - h.putDouble(v); - return this; - } - - public Hasher putBoolean(boolean b) - { - count += Byte.BYTES; - for (Hasher h : underlying) - h.putBoolean(b); - return this; - } - - public Hasher putChar(char c) - { - count += Character.BYTES; - for (Hasher h : underlying) - h.putChar(c); - return this; - } - - public Hasher putUnencodedChars(CharSequence charSequence) - { - throw new UnsupportedOperationException(); - } - - public Hasher putString(CharSequence charSequence, Charset charset) - { - throw new UnsupportedOperationException(); - } - - public <T> Hasher putObject(T t, Funnel<? super T> funnel) - { - throw new UnsupportedOperationException(); - } - - public HashCode hash() - { - byte[] res = new byte[bits / 8]; - int i = 0; - for (Hasher hasher : underlying) - { - HashCode newHash = hasher.hash(); - i += newHash.writeBytesTo(res, i, newHash.bits() / 8); - } - return HashCode.fromBytes(res); - } - - public long getCount() - { - return count; - } - } - private MerkleTree.RowHash rowHash(UnfilteredRowIterator partition) { validated++; // MerkleTree uses XOR internally, so we want lots of output bits here - CountingHasher hasher = new CountingHasher(); - UnfilteredRowIterators.digest(partition, hasher, MessagingService.current_version); + Digest digest = Digest.forValidator(); + UnfilteredRowIterators.digest(partition, digest, MessagingService.current_version); // only return new hash for merkle tree in case digest was updated - see CASSANDRA-8979 - return hasher.count > 0 - ? new MerkleTree.RowHash(partition.partitionKey().getToken(), hasher.hash().asBytes(), hasher.count) + return digest.inputBytes() > 0 + ? new MerkleTree.RowHash(partition.partitionKey().getToken(), digest.digest(), digest.inputBytes()) : null; } diff --git a/src/java/org/apache/cassandra/schema/SchemaConstants.java b/src/java/org/apache/cassandra/schema/SchemaConstants.java index e51a31b..82bd2cb 100644 --- a/src/java/org/apache/cassandra/schema/SchemaConstants.java +++ b/src/java/org/apache/cassandra/schema/SchemaConstants.java @@ -26,7 +26,7 @@ import java.util.regex.Pattern; import com.google.common.collect.ImmutableSet; -import org.apache.cassandra.utils.HashingUtils; +import org.apache.cassandra.db.Digest; public final class SchemaConstants { @@ -66,7 +66,7 @@ public final class SchemaConstants static { - emptyVersion = UUID.nameUUIDFromBytes(HashingUtils.CURRENT_HASH_FUNCTION.newHasher().hash().asBytes()); + emptyVersion = UUID.nameUUIDFromBytes(Digest.forSchema().digest()); } /** diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java index f31cf37..76bda0f 100644 --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@ -26,7 +26,6 @@ import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.*; import com.google.common.collect.Maps; -import com.google.common.hash.Hasher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +47,6 @@ import org.apache.cassandra.service.reads.repair.ReadRepairStrategy; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.HashingUtils; import static java.lang.String.format; @@ -354,8 +352,7 @@ public final class SchemaKeyspace */ static UUID calculateSchemaDigest() { - Hasher hasher = HashingUtils.CURRENT_HASH_FUNCTION.newHasher(); - + Digest digest = Digest.forSchema(); for (String table : ALL) { // Due to CASSANDRA-11050 we want to exclude DROPPED_COLUMNS for schema digest computation. We can and @@ -372,12 +369,12 @@ public final class SchemaKeyspace try (RowIterator partition = schema.next()) { if (!isSystemKeyspaceSchemaPartition(partition.partitionKey())) - RowIterators.digest(partition, hasher); + RowIterators.digest(partition, digest); } } } } - return UUID.nameUUIDFromBytes(hasher.hash().asBytes()); + return UUID.nameUUIDFromBytes(digest.digest()); } /** diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index f0d9132..1797087 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -22,6 +22,8 @@ import java.lang.reflect.Field; import java.math.BigInteger; import java.net.*; import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.*; import java.util.concurrent.*; import java.util.zip.CRC32; @@ -101,6 +103,18 @@ public class FBUtilities public static final int MAX_UNSIGNED_SHORT = 0xFFFF; + public static MessageDigest newMessageDigest(String algorithm) + { + try + { + return MessageDigest.getInstance(algorithm); + } + catch (NoSuchAlgorithmException nsae) + { + throw new RuntimeException("the requested digest algorithm (" + algorithm + ") is not available", nsae); + } + } + /** * Please use getJustBroadcastAddress instead. You need this only when you have to listen/connect. It's also missing * the port you should be using. 99% of code doesn't want this. diff --git a/src/java/org/apache/cassandra/utils/HashingUtils.java b/src/java/org/apache/cassandra/utils/HashingUtils.java deleted file mode 100644 index 9e65a5d..0000000 --- a/src/java/org/apache/cassandra/utils/HashingUtils.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.utils; - -import java.nio.ByteBuffer; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; - -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hasher; -import com.google.common.hash.Hashing; - -public class HashingUtils -{ - public static final HashFunction CURRENT_HASH_FUNCTION = Hashing.md5(); - - public static MessageDigest newMessageDigest(String algorithm) - { - try - { - return MessageDigest.getInstance(algorithm); - } - catch (NoSuchAlgorithmException nsae) - { - throw new RuntimeException("the requested digest algorithm (" + algorithm + ") is not available", nsae); - } - } - - public static void updateBytes(Hasher hasher, ByteBuffer input) - { - if (!input.hasRemaining()) - return; - - if (input.hasArray()) - { - byte[] b = input.array(); - int ofs = input.arrayOffset(); - int pos = input.position(); - int lim = input.limit(); - hasher.putBytes(b, ofs + pos, lim - pos); - input.position(lim); - } - else - { - int len = input.remaining(); - int n = Math.min(len, 1 << 12); // either the remaining amount or 4kb - byte[] tempArray = new byte[n]; - while (len > 0) - { - int chunk = Math.min(len, tempArray.length); - input.get(tempArray, 0, chunk); - hasher.putBytes(tempArray, 0, chunk); - len -= chunk; - } - } - } - - public static void updateWithShort(Hasher hasher, int val) - { - hasher.putByte((byte) ((val >> 8) & 0xFF)); - hasher.putByte((byte) (val & 0xFF)); - } - - public static void updateWithByte(Hasher hasher, int val) - { - hasher.putByte((byte) (val & 0xFF)); - } - - public static void updateWithInt(Hasher hasher, int val) - { - hasher.putByte((byte) ((val >>> 24) & 0xFF)); - hasher.putByte((byte) ((val >>> 16) & 0xFF)); - hasher.putByte((byte) ((val >>> 8) & 0xFF)); - hasher.putByte((byte) ((val >>> 0) & 0xFF)); - } - - public static void updateWithLong(Hasher hasher, long val) - { - hasher.putByte((byte) ((val >>> 56) & 0xFF)); - hasher.putByte((byte) ((val >>> 48) & 0xFF)); - hasher.putByte((byte) ((val >>> 40) & 0xFF)); - hasher.putByte((byte) ((val >>> 32) & 0xFF)); - hasher.putByte((byte) ((val >>> 24) & 0xFF)); - hasher.putByte((byte) ((val >>> 16) & 0xFF)); - hasher.putByte((byte) ((val >>> 8) & 0xFF)); - hasher.putByte((byte) ((val >>> 0) & 0xFF)); - } - - public static void updateWithBoolean(Hasher hasher, boolean val) - { - updateWithByte(hasher, val ? 0 : 1); - } -} diff --git a/src/java/org/apache/cassandra/utils/MD5Digest.java b/src/java/org/apache/cassandra/utils/MD5Digest.java index 5c0c1de..d542991 100644 --- a/src/java/org/apache/cassandra/utils/MD5Digest.java +++ b/src/java/org/apache/cassandra/utils/MD5Digest.java @@ -46,7 +46,7 @@ public class MD5Digest @Override protected MessageDigest initialValue() { - return HashingUtils.newMessageDigest("MD5"); + return FBUtilities.newMessageDigest("MD5"); } @Override diff --git a/src/java/org/apache/cassandra/utils/UUIDGen.java b/src/java/org/apache/cassandra/utils/UUIDGen.java index 103042d..c83e292 100644 --- a/src/java/org/apache/cassandra/utils/UUIDGen.java +++ b/src/java/org/apache/cassandra/utils/UUIDGen.java @@ -403,15 +403,35 @@ public class UUIDGen long pid = NativeLibrary.getProcessID(); if (pid < 0) pid = new Random(System.currentTimeMillis()).nextLong(); - HashingUtils.updateWithLong(hasher, pid); + updateWithLong(hasher, pid); ClassLoader loader = UUIDGen.class.getClassLoader(); int loaderId = loader != null ? System.identityHashCode(loader) : 0; - HashingUtils.updateWithInt(hasher, loaderId); + updateWithInt(hasher, loaderId); return hasher.hash().asBytes(); } + private static void updateWithInt(Hasher hasher, int val) + { + hasher.putByte((byte) ((val >>> 24) & 0xFF)); + hasher.putByte((byte) ((val >>> 16) & 0xFF)); + hasher.putByte((byte) ((val >>> 8) & 0xFF)); + hasher.putByte((byte) ((val >>> 0) & 0xFF)); + } + + public static void updateWithLong(Hasher hasher, long val) + { + hasher.putByte((byte) ((val >>> 56) & 0xFF)); + hasher.putByte((byte) ((val >>> 48) & 0xFF)); + hasher.putByte((byte) ((val >>> 40) & 0xFF)); + hasher.putByte((byte) ((val >>> 32) & 0xFF)); + hasher.putByte((byte) ((val >>> 24) & 0xFF)); + hasher.putByte((byte) ((val >>> 16) & 0xFF)); + hasher.putByte((byte) ((val >>> 8) & 0xFF)); + hasher.putByte((byte) ((val >>> 0) & 0xFF)); + } + /** * Helper function used exclusively by UUIDGen to create **/ diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java index 0852312..7ed8a60 100644 --- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java +++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java @@ -23,14 +23,13 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import com.google.common.hash.Hasher; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import com.github.benmanes.caffeine.cache.Weigher; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.db.Digest; import org.apache.cassandra.db.RowUpdateBuilder; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.UTF8Type; @@ -45,8 +44,8 @@ import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.HashingUtils; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; @@ -105,11 +104,11 @@ public class CacheProviderTest private void assertDigests(IRowCacheEntry one, CachedBTreePartition two) { assertTrue(one instanceof CachedBTreePartition); - Hasher h1 = HashingUtils.CURRENT_HASH_FUNCTION.newHasher(); - Hasher h2 = HashingUtils.CURRENT_HASH_FUNCTION.newHasher(); - UnfilteredRowIterators.digest(((CachedBTreePartition) one).unfilteredIterator(), h1, MessagingService.current_version); - UnfilteredRowIterators.digest(((CachedBTreePartition) two).unfilteredIterator(), h2, MessagingService.current_version); - Assert.assertEquals(h1.hash(), h2.hash()); + Digest d1 = Digest.forReadResponse(); + Digest d2 = Digest.forReadResponse(); + UnfilteredRowIterators.digest(((CachedBTreePartition) one).unfilteredIterator(), d1, MessagingService.current_version); + UnfilteredRowIterators.digest(((CachedBTreePartition) two).unfilteredIterator(), d2, MessagingService.current_version); + assertArrayEquals(d1.digest(), d2.digest()); } private void concurrentCase(final CachedBTreePartition partition, final ICache<MeasureableString, IRowCacheEntry> cache) throws InterruptedException diff --git a/test/unit/org/apache/cassandra/db/CounterCellTest.java b/test/unit/org/apache/cassandra/db/CounterCellTest.java index 36f8c92..4ce9802 100644 --- a/test/unit/org/apache/cassandra/db/CounterCellTest.java +++ b/test/unit/org/apache/cassandra/db/CounterCellTest.java @@ -20,7 +20,6 @@ package org.apache.cassandra.db; import java.nio.ByteBuffer; -import com.google.common.hash.Hasher; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -263,8 +262,8 @@ public class CounterCellTest ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COUNTER1); ByteBuffer col = ByteBufferUtil.bytes("val"); - Hasher hasher1 = HashingUtils.CURRENT_HASH_FUNCTION.newHasher(); - Hasher hasher2 = HashingUtils.CURRENT_HASH_FUNCTION.newHasher(); + Digest digest1 = Digest.forReadResponse(); + Digest digest2 = Digest.forReadResponse(); CounterContext.ContextState state = CounterContext.ContextState.allocate(0, 2, 2); state.writeRemote(CounterId.fromInt(1), 4L, 4L); @@ -277,10 +276,10 @@ public class CounterCellTest ColumnMetadata cDef = cfs.metadata().getColumn(col); Cell cleared = BufferCell.live(cDef, 5, CounterContext.instance().clearAllLocal(state.context)); - original.digest(hasher1); - cleared.digest(hasher2); + original.digest(digest1); + cleared.digest(digest2); - Assert.assertEquals(hasher1.hash(), hasher2.hash()); + assertArrayEquals(digest1.digest(), digest2.digest()); } @Test @@ -297,9 +296,9 @@ public class CounterCellTest builder.addCell(emptyCell); Row row = builder.build(); - Hasher hasher = HashingUtils.CURRENT_HASH_FUNCTION.newHasher(); - row.digest(hasher); - assertNotNull(hasher.hash()); + Digest digest = Digest.forReadResponse(); + row.digest(digest); + assertNotNull(digest.digest()); } } diff --git a/test/unit/org/apache/cassandra/db/DigestTest.java b/test/unit/org/apache/cassandra/db/DigestTest.java new file mode 100644 index 0000000..4fd12d0 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/DigestTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import com.google.common.hash.Hashing; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Hex; +import org.apache.cassandra.utils.UUIDGen; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class DigestTest +{ + private static final Logger logger = LoggerFactory.getLogger(DigestTest.class); + + @Test + public void hashEmptyBytes() throws Exception { + Assert.assertArrayEquals(Hex.hexToBytes("d41d8cd98f00b204e9800998ecf8427e"), + Digest.forReadResponse().update(ByteBufferUtil.EMPTY_BYTE_BUFFER).digest()); + } + + @Test + public void hashBytesFromTinyDirectByteBuffer() throws Exception { + ByteBuffer directBuf = ByteBuffer.allocateDirect(8); + directBuf.putLong(5L).position(0); + directBuf.position(0); + assertArrayEquals(Hex.hexToBytes("aaa07454fa93ed2d37b4c5da9f2f87fd"), + Digest.forReadResponse().update(directBuf).digest()); + } + + @Test + public void hashBytesFromLargerDirectByteBuffer() throws Exception { + ByteBuffer directBuf = ByteBuffer.allocateDirect(1024); + for (int i = 0; i < 100; i++) { + directBuf.putInt(i); + } + directBuf.position(0); + assertArrayEquals(Hex.hexToBytes("daf10ea8894783b1b2618309494cde21"), + Digest.forReadResponse().update(directBuf).digest()); + } + + @Test + public void hashBytesFromTinyOnHeapByteBuffer() throws Exception { + ByteBuffer onHeapBuf = ByteBuffer.allocate(8); + onHeapBuf.putLong(5L); + onHeapBuf.position(0); + assertArrayEquals(Hex.hexToBytes("aaa07454fa93ed2d37b4c5da9f2f87fd"), + Digest.forReadResponse().update(onHeapBuf).digest()); + } + + @Test + public void hashBytesFromLargerOnHeapByteBuffer() throws Exception { + ByteBuffer onHeapBuf = ByteBuffer.allocate(1024); + for (int i = 0; i < 100; i++) { + onHeapBuf.putInt(i); + } + onHeapBuf.position(0); + assertArrayEquals(Hex.hexToBytes("daf10ea8894783b1b2618309494cde21"), + Digest.forReadResponse().update(onHeapBuf).digest()); + } + + @Test + public void testValidatorDigest() + { + Digest[] digests = new Digest[] + { + Digest.forValidator(), + new Digest(Hashing.murmur3_128(1000).newHasher()), + new Digest(Hashing.murmur3_128(2000).newHasher()) + }; + byte [] random = UUIDGen.getTimeUUIDBytes(); + + for (Digest digest : digests) + { + digest.updateWithByte((byte) 33) + .update(random, 0, random.length) + .update(ByteBuffer.wrap(random)) + .update(random, 0, 3) + .updateWithBoolean(false) + .updateWithInt(77) + .updateWithLong(101); + } + + long len = Byte.BYTES + + random.length * 2 // both the byte[] and the ByteBuffer + + 3 // 3 bytes from the random byte[] + + Byte.BYTES + + Integer.BYTES + + Long.BYTES; + + assertEquals(len, digests[0].inputBytes()); + byte[] h = digests[0].digest(); + assertArrayEquals(digests[1].digest(), Arrays.copyOfRange(h, 0, 16)); + assertArrayEquals(digests[2].digest(), Arrays.copyOfRange(h, 16, 32)); + } + +} diff --git a/test/unit/org/apache/cassandra/db/PartitionTest.java b/test/unit/org/apache/cassandra/db/PartitionTest.java index 38fb38a..be3a9e4 100644 --- a/test/unit/org/apache/cassandra/db/PartitionTest.java +++ b/test/unit/org/apache/cassandra/db/PartitionTest.java @@ -20,11 +20,13 @@ package org.apache.cassandra.db; import java.io.IOException; import java.security.NoSuchAlgorithmException; +import java.util.Arrays; -import com.google.common.hash.Hasher; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; + +import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.db.rows.EncodingStats; import org.apache.cassandra.db.rows.UnfilteredRowIterators; @@ -40,9 +42,9 @@ import org.apache.cassandra.Util; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.HashingUtils; import static junit.framework.Assert.assertTrue; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -138,28 +140,22 @@ public class PartitionTest ImmutableBTreePartition p1 = Util.getOnlyPartitionUnfiltered(cmd1); ImmutableBTreePartition p2 = Util.getOnlyPartitionUnfiltered(cmd2); - Hasher hasher1 = HashingUtils.CURRENT_HASH_FUNCTION.newHasher(); - Hasher hasher2 = HashingUtils.CURRENT_HASH_FUNCTION.newHasher(); - UnfilteredRowIterators.digest(p1.unfilteredIterator(), hasher1, version); - UnfilteredRowIterators.digest(p2.unfilteredIterator(), hasher2, version); - Assert.assertFalse(hasher1.hash().equals(hasher2.hash())); + byte[] digest1 = getDigest(p1.unfilteredIterator(), version); + byte[] digest2 = getDigest(p2.unfilteredIterator(), version); + assertFalse(Arrays.equals(digest1, digest2)); p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build()); p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build()); - hasher1 = HashingUtils.CURRENT_HASH_FUNCTION.newHasher(); - hasher2 = HashingUtils.CURRENT_HASH_FUNCTION.newHasher(); - UnfilteredRowIterators.digest(p1.unfilteredIterator(), hasher1, version); - UnfilteredRowIterators.digest(p2.unfilteredIterator(), hasher2, version); - Assert.assertEquals(hasher1.hash(), hasher2.hash()); + digest1 = getDigest(p1.unfilteredIterator(), version); + digest2 = getDigest(p2.unfilteredIterator(), version); + assertArrayEquals(digest1, digest2); p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build()); RowUpdateBuilder.deleteRow(cfs.metadata(), 6, "key2", "c").applyUnsafe(); p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build()); - hasher1 = HashingUtils.CURRENT_HASH_FUNCTION.newHasher(); - hasher2 = HashingUtils.CURRENT_HASH_FUNCTION.newHasher(); - UnfilteredRowIterators.digest(p1.unfilteredIterator(), hasher1, version); - UnfilteredRowIterators.digest(p2.unfilteredIterator(), hasher2, version); - Assert.assertFalse(hasher1.hash().equals(hasher2.hash())); + digest1 = getDigest(p1.unfilteredIterator(), version); + digest2 = getDigest(p2.unfilteredIterator(), version); + assertFalse(Arrays.equals(digest1, digest2)); } finally { @@ -167,6 +163,13 @@ public class PartitionTest } } + private byte[] getDigest(UnfilteredRowIterator partition, int version) + { + Digest digest = Digest.forReadResponse(); + UnfilteredRowIterators.digest(partition, digest, version); + return digest.digest(); + } + @Test public void testColumnStatsRecordsRowDeletesCorrectly() { diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java index 8b73502..c04f489 100644 --- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java +++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java @@ -39,6 +39,7 @@ import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.filter.RowFilter; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.CounterColumnType; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.RowIterator; @@ -72,6 +73,7 @@ import org.apache.cassandra.utils.UUIDGen; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class ReadCommandTest { @@ -82,6 +84,7 @@ public class ReadCommandTest private static final String CF4 = "Standard4"; private static final String CF5 = "Standard5"; private static final String CF6 = "Standard6"; + private static final String CF7 = "Counter7"; private static final InetAddressAndPort REPAIR_COORDINATOR; static { @@ -151,6 +154,13 @@ public class ReadCommandTest .addRegularColumn("b", AsciiType.instance) .caching(CachingParams.CACHE_EVERYTHING); + TableMetadata.Builder metadata7 = + TableMetadata.builder(KEYSPACE, CF7) + .flags(EnumSet.of(TableMetadata.Flag.COUNTER)) + .addPartitionKeyColumn("key", BytesType.instance) + .addClusteringColumn("col", AsciiType.instance) + .addRegularColumn("c", CounterColumnType.instance); + SchemaLoader.prepareServer(); SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), @@ -159,7 +169,8 @@ public class ReadCommandTest metadata3, metadata4, metadata5, - metadata6); + metadata6, + metadata7); LocalSessionAccessor.startup(); } @@ -647,6 +658,66 @@ public class ReadCommandTest assertEquals(1, readCount(sstables.get(1))); } + @Test + public void dontIncludeLegacyCounterContextInDigest() throws IOException + { + // Serializations of a CounterContext containing legacy (pre-2.1) shards + // can legitimately differ across replicas. For this reason, the context + // bytes are omitted from the repaired digest if they contain legacy shards. + // This clearly has a tradeoff with the efficacy of the digest, without doing + // so false positive digest mismatches will be reported for scenarios where + // there is nothing that can be done to "fix" the replicas + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF7); + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + + // insert a row with the counter column having value 0, in a legacy shard. + new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key")) + .clustering("aa") + .addLegacyCounterCell("c", 0L) + .build() + .apply(); + cfs.forceBlockingFlush(); + cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null)); + + // execute a read and capture the digest + ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build(); + ByteBuffer digestWithLegacyCounter0 = performReadAndVerifyRepairedInfo(readCommand, 1, 1, true); + assertFalse(ByteBufferUtil.EMPTY_BYTE_BUFFER.equals(digestWithLegacyCounter0)); + + // truncate, then re-insert the same partition, but this time with a legacy + // shard having the value 1. The repaired digest should match the previous, as + // the values (context) are not included, only the cell metadata (ttl, timestamp, etc) + cfs.truncateBlocking(); + new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key")) + .clustering("aa") + .addLegacyCounterCell("c", 1L) + .build() + .apply(); + cfs.forceBlockingFlush(); + cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null)); + + ByteBuffer digestWithLegacyCounter1 = performReadAndVerifyRepairedInfo(readCommand, 1, 1, true); + assertEquals(digestWithLegacyCounter0, digestWithLegacyCounter1); + + // truncate, then re-insert the same partition, but this time with a non-legacy + // counter cell present. The repaired digest should not match the previous ones + // as this time the value (context) is included. + cfs.truncateBlocking(); + new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key")) + .clustering("aa") + .add("c", 1L) + .build() + .apply(); + cfs.forceBlockingFlush(); + cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null)); + + ByteBuffer digestWithCounterCell = performReadAndVerifyRepairedInfo(readCommand, 1, 1, true); + assertFalse(ByteBufferUtil.EMPTY_BYTE_BUFFER.equals(digestWithCounterCell)); + assertFalse(digestWithLegacyCounter0.equals(digestWithCounterCell)); + assertFalse(digestWithLegacyCounter1.equals(digestWithCounterCell)); + } + private long readCount(SSTableReader sstable) { return sstable.getReadMeter().count(); @@ -810,10 +881,19 @@ public class ReadCommandTest } } - private void mutateRepaired(ColumnFamilyStore cfs, SSTableReader sstable, long repairedAt, UUID pendingSession) throws IOException + private void mutateRepaired(ColumnFamilyStore cfs, SSTableReader sstable, long repairedAt, UUID pendingSession) { - sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, repairedAt, pendingSession, false); - sstable.reloadSSTableMetadata(); + try + { + sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, repairedAt, pendingSession, false); + sstable.reloadSSTableMetadata(); + } + catch (IOException e) + { + e.printStackTrace(); + fail("Caught IOException when mutating sstable metadata"); + } + if (pendingSession != null) { // setup a minimal repair session. This is necessary because we diff --git a/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java b/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java index 1ac5440..3a07a00 100644 --- a/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java +++ b/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java @@ -20,6 +20,8 @@ package org.apache.cassandra.db; import java.nio.ByteBuffer; import java.util.*; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.db.rows.*; @@ -181,4 +183,12 @@ public class RowUpdateBuilder { return delete(columnMetadata.name.toString()); } + + public RowUpdateBuilder addLegacyCounterCell(String columnName, long value) + { + assert updateBuilder.metadata().getColumn(new ColumnIdentifier(columnName, true)).isCounterColumn(); + ByteBuffer val = CounterContext.instance().createLocal(value); + rowBuilder().add(columnName, val); + return this; + } } diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java index a288edb..cf3411a 100644 --- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java +++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.repair; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -28,8 +27,6 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import com.google.common.hash.Hasher; - import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.compaction.CompactionsTest; import org.apache.cassandra.io.sstable.format.SSTableReader; @@ -61,6 +58,7 @@ import org.apache.cassandra.utils.MerkleTrees; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -197,8 +195,8 @@ public class ValidatorTest SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); UUID repairSessionId = UUIDGen.getTimeUUID(); final RepairJobDesc desc = new RepairJobDesc(repairSessionId, UUIDGen.getTimeUUID(), cfs.keyspace.getName(), - cfs.getTableName(), Collections.singletonList(new Range<>(sstable.first.getToken(), - sstable.last.getToken()))); + cfs.getTableName(), Collections.singletonList(new Range<>(sstable.first.getToken(), + sstable.last.getToken()))); InetAddressAndPort host = InetAddressAndPort.getByName("127.0.0.2"); @@ -371,45 +369,6 @@ public class ValidatorTest return left; } - @Test - public void testCountingHasher() - { - Hasher [] hashers = new Hasher[] {new Validator.CountingHasher(), Validator.CountingHasher.hashFunctions[0].newHasher(), Validator.CountingHasher.hashFunctions[1].newHasher() }; - byte [] random = UUIDGen.getTimeUUIDBytes(); - - // call all overloaded methods: - for (Hasher hasher : hashers) - { - hasher.putByte((byte) 33) - .putBytes(random) - .putBytes(ByteBuffer.wrap(random)) - .putBytes(random, 0, 3) - .putChar('a') - .putBoolean(false) - .putDouble(3.3) - .putInt(77) - .putFloat(99) - .putLong(101) - .putShort((short) 23); - } - - long len = Byte.BYTES - + random.length * 2 // both the byte[] and the ByteBuffer - + 3 // 3 bytes from the random byte[] - + Character.BYTES - + Byte.BYTES - + Double.BYTES - + Integer.BYTES - + Float.BYTES - + Long.BYTES - + Short.BYTES; - - byte [] h = hashers[0].hash().asBytes(); - assertTrue(Arrays.equals(hashers[1].hash().asBytes(), Arrays.copyOfRange(h, 0, 16))); - assertTrue(Arrays.equals(hashers[2].hash().asBytes(), Arrays.copyOfRange(h, 16, 32))); - assertEquals(len, ((Validator.CountingHasher)hashers[0]).getCount()); - } - private CompletableFuture<Message> registerOutgoingMessageSink() { final CompletableFuture<Message> future = new CompletableFuture<>(); diff --git a/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java b/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java index 8881018..8ec0177 100644 --- a/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java +++ b/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java @@ -25,13 +25,13 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.junit.Test; +import org.apache.cassandra.db.Digest; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.TreeResponse; -import org.apache.cassandra.utils.HashingUtils; import org.apache.cassandra.utils.MerkleTree; import org.apache.cassandra.utils.MerkleTrees; import org.apache.cassandra.utils.MerkleTreesTest; @@ -43,7 +43,9 @@ public class DifferenceHolderTest { private static byte[] digest(String string) { - return HashingUtils.newMessageDigest("SHA-256").digest(string.getBytes()); + return Digest.forValidator() + .update(string.getBytes(), 0, string.getBytes().length) + .digest(); } @Test diff --git a/test/unit/org/apache/cassandra/utils/HashingUtilsTest.java b/test/unit/org/apache/cassandra/utils/HashingUtilsTest.java deleted file mode 100644 index 3988903..0000000 --- a/test/unit/org/apache/cassandra/utils/HashingUtilsTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.utils; - -import java.nio.ByteBuffer; - -import com.google.common.hash.Hasher; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class HashingUtilsTest -{ - private static final Logger logger = LoggerFactory.getLogger(HashingUtilsTest.class); - - @Test - public void hashEmptyBytes() throws Exception { - Hasher hasher = HashingUtils.CURRENT_HASH_FUNCTION.newHasher(); - HashingUtils.updateBytes(hasher, ByteBuffer.wrap(new byte[]{})); - String md5HashInHexOfEmptyByteArray = ByteBufferUtil.bytesToHex(ByteBuffer.wrap(hasher.hash().asBytes())); - Assert.assertEquals("d41d8cd98f00b204e9800998ecf8427e", md5HashInHexOfEmptyByteArray); - } - - @Test - public void hashBytesFromTinyDirectByteBuffer() throws Exception { - Hasher hasher = HashingUtils.CURRENT_HASH_FUNCTION.newHasher(); - ByteBuffer directBuf = ByteBuffer.allocateDirect(8); - directBuf.putLong(5L); - directBuf.position(0); - HashingUtils.updateBytes(hasher, directBuf); - - String md5HashInHexOfDirectByteBuffer = ByteBufferUtil.bytesToHex(ByteBuffer.wrap(hasher.hash().asBytes())); - Assert.assertEquals("aaa07454fa93ed2d37b4c5da9f2f87fd", md5HashInHexOfDirectByteBuffer); - } - - @Test - public void hashBytesFromLargerDirectByteBuffer() throws Exception { - Hasher hasher = HashingUtils.CURRENT_HASH_FUNCTION.newHasher(); - ByteBuffer directBuf = ByteBuffer.allocateDirect(1024); - for (int i = 0; i < 100; i++) { - directBuf.putInt(i); - } - directBuf.position(0); - HashingUtils.updateBytes(hasher, directBuf); - - String md5HashInHexOfDirectByteBuffer = ByteBufferUtil.bytesToHex(ByteBuffer.wrap(hasher.hash().asBytes())); - Assert.assertEquals("daf10ea8894783b1b2618309494cde21", md5HashInHexOfDirectByteBuffer); - } - - @Test - public void hashBytesFromTinyOnHeapByteBuffer() throws Exception { - Hasher hasher = HashingUtils.CURRENT_HASH_FUNCTION.newHasher(); - ByteBuffer onHeapBuf = ByteBuffer.allocate(8); - onHeapBuf.putLong(5L); - onHeapBuf.position(0); - HashingUtils.updateBytes(hasher, onHeapBuf); - - String md5HashInHexOfDirectByteBuffer = ByteBufferUtil.bytesToHex(ByteBuffer.wrap(hasher.hash().asBytes())); - Assert.assertEquals("aaa07454fa93ed2d37b4c5da9f2f87fd", md5HashInHexOfDirectByteBuffer); - } - - @Test - public void hashBytesFromLargerOnHeapByteBuffer() throws Exception { - Hasher hasher = HashingUtils.CURRENT_HASH_FUNCTION.newHasher(); - ByteBuffer onHeapBuf = ByteBuffer.allocate(1024); - for (int i = 0; i < 100; i++) { - onHeapBuf.putInt(i); - } - onHeapBuf.position(0); - HashingUtils.updateBytes(hasher, onHeapBuf); - - String md5HashInHexOfDirectByteBuffer = ByteBufferUtil.bytesToHex(ByteBuffer.wrap(hasher.hash().asBytes())); - Assert.assertEquals("daf10ea8894783b1b2618309494cde21", md5HashInHexOfDirectByteBuffer); - } -} diff --git a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java index 36ae4a0..1cdcc22 100644 --- a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java +++ b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java @@ -26,6 +26,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Digest; import org.apache.cassandra.dht.ByteOrderedPartitioner; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; @@ -48,9 +49,11 @@ public class MerkleTreeTest { private static final byte[] DUMMY = digest("dummy"); - private static byte[] digest(String string) + static byte[] digest(String string) { - return HashingUtils.newMessageDigest("SHA-256").digest(string.getBytes()); + return Digest.forValidator() + .update(string.getBytes(), 0, string.getBytes().length) + .digest(); } /** diff --git a/test/unit/org/apache/cassandra/utils/MerkleTreesTest.java b/test/unit/org/apache/cassandra/utils/MerkleTreesTest.java index 9e70c20..5b589fb 100644 --- a/test/unit/org/apache/cassandra/utils/MerkleTreesTest.java +++ b/test/unit/org/apache/cassandra/utils/MerkleTreesTest.java @@ -38,17 +38,13 @@ import org.apache.cassandra.utils.MerkleTree.RowHash; import org.apache.cassandra.utils.MerkleTree.TreeRange; import org.apache.cassandra.utils.MerkleTrees.TreeRangeIterator; +import static org.apache.cassandra.utils.MerkleTreeTest.digest; import static org.junit.Assert.*; public class MerkleTreesTest { private static final byte[] DUMMY = digest("dummy"); - private static byte[] digest(String string) - { - return HashingUtils.newMessageDigest("SHA-256").digest(string.getBytes()); - } - /** * If a test assumes that the tree is 8 units wide, then it should set this value * to 8. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org