Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/39bcdcd3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/39bcdcd3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/39bcdcd3 Branch: refs/heads/trunk Commit: 39bcdcd32ea5dcfd372d4b3eaf10c1c3de0547fb Parents: 8b7e967 d2248f2 Author: Aleksey Yeshchenko <alek...@apple.com> Authored: Fri Mar 23 14:49:51 2018 +0000 Committer: Aleksey Yeshchenko <alek...@apple.com> Committed: Fri Mar 23 14:49:51 2018 +0000 ---------------------------------------------------------------------- CHANGES.txt | 3 ++- .../org/apache/cassandra/db/ReadResponse.java | 22 +++++++++++++++- .../db/rows/RangeTombstoneBoundMarker.java | 2 +- .../db/rows/RangeTombstoneBoundaryMarker.java | 5 +++- .../reads/repair/RowIteratorMergeListener.java | 27 +++++++++++++++----- 5 files changed, 49 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/39bcdcd3/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index d4e5e37,987b9f7..76803a8 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,213 -1,3 +1,212 @@@ +4.0 + * Fix scheduling of speculative retry threshold recalculation (CASSANDRA-14338) + * Add support for hybrid MIN(), MAX() speculative retry policies (CASSANDRA-14293) + * Correct and clarify SSLFactory.getSslContext method and call sites (CASSANDRA-14314) - * Use zero as default score in DynamicEndpointSnitch (CASSANDRA-14252) + * Handle static and partition deletion properly on ThrottledUnfilteredIterator (CASSANDRA-14315) + * NodeTool clientstats should show SSL Cipher (CASSANDRA-14322) + * Add ability to specify driver name and version (CASSANDRA-14275) + * Abstract streaming for pluggable storage (CASSANDRA-14115) + * Forced incremental repairs should promote sstables if they can (CASSANDRA-14294) + * Use Murmur3 for validation compactions (CASSANDRA-14002) + * Comma at the end of the seed list is interpretated as localhost (CASSANDRA-14285) + * Refactor read executor and response resolver, abstract read repair (CASSANDRA-14058) + * Add optional startup delay to wait until peers are ready (CASSANDRA-13993) + * Add a few options to nodetool verify (CASSANDRA-14201) + * CVE-2017-5929 Security vulnerability and redefine default log rotation policy (CASSANDRA-14183) + * Use JVM default SSL validation algorithm instead of custom default (CASSANDRA-13259) + * Better document in code InetAddressAndPort usage post 7544, incorporate port into UUIDGen node (CASSANDRA-14226) + * Fix sstablemetadata date string for minLocalDeletionTime (CASSANDRA-14132) + * Make it possible to change neverPurgeTombstones during runtime (CASSANDRA-14214) + * Remove GossipDigestSynVerbHandler#doSort() (CASSANDRA-14174) + * Add nodetool clientlist (CASSANDRA-13665) + * Revert ProtocolVersion changes from CASSANDRA-7544 (CASSANDRA-14211) + * Non-disruptive seed node list reload (CASSANDRA-14190) + * Nodetool tablehistograms to print statics for all the tables (CASSANDRA-14185) + * Migrate dtests to use pytest and python3 (CASSANDRA-14134) + * Allow storage port to be configurable per node (CASSANDRA-7544) + * Make sub-range selection for non-frozen collections return null instead of empty (CASSANDRA-14182) + * BloomFilter serialization format should not change byte ordering (CASSANDRA-9067) + * Remove unused on-heap BloomFilter implementation (CASSANDRA-14152) + * Delete temp test files on exit (CASSANDRA-14153) + * Make PartitionUpdate and Mutation immutable (CASSANDRA-13867) + * Fix CommitLogReplayer exception for CDC data (CASSANDRA-14066) + * Fix cassandra-stress startup failure (CASSANDRA-14106) + * Remove initialDirectories from CFS (CASSANDRA-13928) + * Fix trivial log format error (CASSANDRA-14015) + * Allow sstabledump to do a json object per partition (CASSANDRA-13848) + * Add option to optimise merkle tree comparison across replicas (CASSANDRA-3200) + * Remove unused and deprecated methods from AbstractCompactionStrategy (CASSANDRA-14081) + * Fix Distribution.average in cassandra-stress (CASSANDRA-14090) + * Support a means of logging all queries as they were invoked (CASSANDRA-13983) + * Presize collections (CASSANDRA-13760) + * Add GroupCommitLogService (CASSANDRA-13530) + * Parallelize initial materialized view build (CASSANDRA-12245) + * Fix flaky SecondaryIndexManagerTest.assert[Not]MarkedAsBuilt (CASSANDRA-13965) + * Make LWTs send resultset metadata on every request (CASSANDRA-13992) + * Fix flaky indexWithFailedInitializationIsNotQueryableAfterPartialRebuild (CASSANDRA-13963) + * Introduce leaf-only iterator (CASSANDRA-9988) + * Upgrade Guava to 23.3 and Airline to 0.8 (CASSANDRA-13997) + * Allow only one concurrent call to StatusLogger (CASSANDRA-12182) + * Refactoring to specialised functional interfaces (CASSANDRA-13982) + * Speculative retry should allow more friendly params (CASSANDRA-13876) + * Throw exception if we send/receive repair messages to incompatible nodes (CASSANDRA-13944) + * Replace usages of MessageDigest with Guava's Hasher (CASSANDRA-13291) + * Add nodetool cmd to print hinted handoff window (CASSANDRA-13728) + * Fix some alerts raised by static analysis (CASSANDRA-13799) + * Checksum sstable metadata (CASSANDRA-13321, CASSANDRA-13593) + * Add result set metadata to prepared statement MD5 hash calculation (CASSANDRA-10786) + * Refactor GcCompactionTest to avoid boxing (CASSANDRA-13941) + * Expose recent histograms in JmxHistograms (CASSANDRA-13642) + * Fix buffer length comparison when decompressing in netty-based streaming (CASSANDRA-13899) + * Properly close StreamCompressionInputStream to release any ByteBuf (CASSANDRA-13906) + * Add SERIAL and LOCAL_SERIAL support for cassandra-stress (CASSANDRA-13925) + * LCS needlessly checks for L0 STCS candidates multiple times (CASSANDRA-12961) + * Correctly close netty channels when a stream session ends (CASSANDRA-13905) + * Update lz4 to 1.4.0 (CASSANDRA-13741) + * Optimize Paxos prepare and propose stage for local requests (CASSANDRA-13862) + * Throttle base partitions during MV repair streaming to prevent OOM (CASSANDRA-13299) + * Use compaction threshold for STCS in L0 (CASSANDRA-13861) + * Fix problem with min_compress_ratio: 1 and disallow ratio < 1 (CASSANDRA-13703) + * Add extra information to SASI timeout exception (CASSANDRA-13677) + * Add incremental repair support for --hosts, --force, and subrange repair (CASSANDRA-13818) + * Rework CompactionStrategyManager.getScanners synchronization (CASSANDRA-13786) + * Add additional unit tests for batch behavior, TTLs, Timestamps (CASSANDRA-13846) + * Add keyspace and table name in schema validation exception (CASSANDRA-13845) + * Emit metrics whenever we hit tombstone failures and warn thresholds (CASSANDRA-13771) + * Make netty EventLoopGroups daemon threads (CASSANDRA-13837) + * Race condition when closing stream sessions (CASSANDRA-13852) + * NettyFactoryTest is failing in trunk on macOS (CASSANDRA-13831) + * Allow changing log levels via nodetool for related classes (CASSANDRA-12696) + * Add stress profile yaml with LWT (CASSANDRA-7960) + * Reduce memory copies and object creations when acting on ByteBufs (CASSANDRA-13789) + * Simplify mx4j configuration (Cassandra-13578) + * Fix trigger example on 4.0 (CASSANDRA-13796) + * Force minumum timeout value (CASSANDRA-9375) + * Use netty for streaming (CASSANDRA-12229) + * Use netty for internode messaging (CASSANDRA-8457) + * Add bytes repaired/unrepaired to nodetool tablestats (CASSANDRA-13774) + * Don't delete incremental repair sessions if they still have sstables (CASSANDRA-13758) + * Fix pending repair manager index out of bounds check (CASSANDRA-13769) + * Don't use RangeFetchMapCalculator when RF=1 (CASSANDRA-13576) + * Don't optimise trivial ranges in RangeFetchMapCalculator (CASSANDRA-13664) + * Use an ExecutorService for repair commands instead of new Thread(..).start() (CASSANDRA-13594) + * Fix race / ref leak in anticompaction (CASSANDRA-13688) + * Expose tasks queue length via JMX (CASSANDRA-12758) + * Fix race / ref leak in PendingRepairManager (CASSANDRA-13751) + * Enable ppc64le runtime as unsupported architecture (CASSANDRA-13615) + * Improve sstablemetadata output (CASSANDRA-11483) + * Support for migrating legacy users to roles has been dropped (CASSANDRA-13371) + * Introduce error metrics for repair (CASSANDRA-13387) + * Refactoring to primitive functional interfaces in AuthCache (CASSANDRA-13732) + * Update metrics to 3.1.5 (CASSANDRA-13648) + * batch_size_warn_threshold_in_kb can now be set at runtime (CASSANDRA-13699) + * Avoid always rebuilding secondary indexes at startup (CASSANDRA-13725) + * Upgrade JMH from 1.13 to 1.19 (CASSANDRA-13727) + * Upgrade SLF4J from 1.7.7 to 1.7.25 (CASSANDRA-12996) + * Default for start_native_transport now true if not set in config (CASSANDRA-13656) + * Don't add localhost to the graph when calculating where to stream from (CASSANDRA-13583) + * Make CDC availability more deterministic via hard-linking (CASSANDRA-12148) + * Allow skipping equality-restricted clustering columns in ORDER BY clause (CASSANDRA-10271) + * Use common nowInSec for validation compactions (CASSANDRA-13671) + * Improve handling of IR prepare failures (CASSANDRA-13672) + * Send IR coordinator messages synchronously (CASSANDRA-13673) + * Flush system.repair table before IR finalize promise (CASSANDRA-13660) + * Fix column filter creation for wildcard queries (CASSANDRA-13650) + * Add 'nodetool getbatchlogreplaythrottle' and 'nodetool setbatchlogreplaythrottle' (CASSANDRA-13614) + * fix race condition in PendingRepairManager (CASSANDRA-13659) + * Allow noop incremental repair state transitions (CASSANDRA-13658) + * Run repair with down replicas (CASSANDRA-10446) + * Added started & completed repair metrics (CASSANDRA-13598) + * Added started & completed repair metrics (CASSANDRA-13598) + * Improve secondary index (re)build failure and concurrency handling (CASSANDRA-10130) + * Improve calculation of available disk space for compaction (CASSANDRA-13068) + * Change the accessibility of RowCacheSerializer for third party row cache plugins (CASSANDRA-13579) + * Allow sub-range repairs for a preview of repaired data (CASSANDRA-13570) + * NPE in IR cleanup when columnfamily has no sstables (CASSANDRA-13585) + * Fix Randomness of stress values (CASSANDRA-12744) + * Allow selecting Map values and Set elements (CASSANDRA-7396) + * Fast and garbage-free Streaming Histogram (CASSANDRA-13444) + * Update repairTime for keyspaces on completion (CASSANDRA-13539) + * Add configurable upper bound for validation executor threads (CASSANDRA-13521) + * Bring back maxHintTTL propery (CASSANDRA-12982) + * Add testing guidelines (CASSANDRA-13497) + * Add more repair metrics (CASSANDRA-13531) + * RangeStreamer should be smarter when picking endpoints for streaming (CASSANDRA-4650) + * Avoid rewrapping an exception thrown for cache load functions (CASSANDRA-13367) + * Log time elapsed for each incremental repair phase (CASSANDRA-13498) + * Add multiple table operation support to cassandra-stress (CASSANDRA-8780) + * Fix incorrect cqlsh results when selecting same columns multiple times (CASSANDRA-13262) + * Fix WriteResponseHandlerTest is sensitive to test execution order (CASSANDRA-13421) + * Improve incremental repair logging (CASSANDRA-13468) + * Start compaction when incremental repair finishes (CASSANDRA-13454) + * Add repair streaming preview (CASSANDRA-13257) + * Cleanup isIncremental/repairedAt usage (CASSANDRA-13430) + * Change protocol to allow sending key space independent of query string (CASSANDRA-10145) + * Make gc_log and gc_warn settable at runtime (CASSANDRA-12661) + * Take number of files in L0 in account when estimating remaining compaction tasks (CASSANDRA-13354) + * Skip building views during base table streams on range movements (CASSANDRA-13065) + * Improve error messages for +/- operations on maps and tuples (CASSANDRA-13197) + * Remove deprecated repair JMX APIs (CASSANDRA-11530) + * Fix version check to enable streaming keep-alive (CASSANDRA-12929) + * Make it possible to monitor an ideal consistency level separate from actual consistency level (CASSANDRA-13289) + * Outbound TCP connections ignore internode authenticator (CASSANDRA-13324) + * Upgrade junit from 4.6 to 4.12 (CASSANDRA-13360) + * Cleanup ParentRepairSession after repairs (CASSANDRA-13359) + * Upgrade snappy-java to 1.1.2.6 (CASSANDRA-13336) + * Incremental repair not streaming correct sstables (CASSANDRA-13328) + * Upgrade the jna version to 4.3.0 (CASSANDRA-13300) + * Add the currentTimestamp, currentDate, currentTime and currentTimeUUID functions (CASSANDRA-13132) + * Remove config option index_interval (CASSANDRA-10671) + * Reduce lock contention for collection types and serializers (CASSANDRA-13271) + * Make it possible to override MessagingService.Verb ids (CASSANDRA-13283) + * Avoid synchronized on prepareForRepair in ActiveRepairService (CASSANDRA-9292) + * Adds the ability to use uncompressed chunks in compressed files (CASSANDRA-10520) + * Don't flush sstables when streaming for incremental repair (CASSANDRA-13226) + * Remove unused method (CASSANDRA-13227) + * Fix minor bugs related to #9143 (CASSANDRA-13217) + * Output warning if user increases RF (CASSANDRA-13079) + * Remove pre-3.0 streaming compatibility code for 4.0 (CASSANDRA-13081) + * Add support for + and - operations on dates (CASSANDRA-11936) + * Fix consistency of incrementally repaired data (CASSANDRA-9143) + * Increase commitlog version (CASSANDRA-13161) + * Make TableMetadata immutable, optimize Schema (CASSANDRA-9425) + * Refactor ColumnCondition (CASSANDRA-12981) + * Parallelize streaming of different keyspaces (CASSANDRA-4663) + * Improved compactions metrics (CASSANDRA-13015) + * Speed-up start-up sequence by avoiding un-needed flushes (CASSANDRA-13031) + * Use Caffeine (W-TinyLFU) for on-heap caches (CASSANDRA-10855) + * Thrift removal (CASSANDRA-11115) + * Remove pre-3.0 compatibility code for 4.0 (CASSANDRA-12716) + * Add column definition kind to dropped columns in schema (CASSANDRA-12705) + * Add (automate) Nodetool Documentation (CASSANDRA-12672) + * Update bundled cqlsh python driver to 3.7.0 (CASSANDRA-12736) + * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681) + * Clean up the SSTableReader#getScanner API wrt removal of RateLimiter (CASSANDRA-12422) + * Use new token allocation for non bootstrap case as well (CASSANDRA-13080) + * Avoid byte-array copy when key cache is disabled (CASSANDRA-13084) + * Require forceful decommission if number of nodes is less than replication factor (CASSANDRA-12510) + * Allow IN restrictions on column families with collections (CASSANDRA-12654) + * Log message size in trace message in OutboundTcpConnection (CASSANDRA-13028) + * Add timeUnit Days for cassandra-stress (CASSANDRA-13029) + * Add mutation size and batch metrics (CASSANDRA-12649) + * Add method to get size of endpoints to TokenMetadata (CASSANDRA-12999) + * Expose time spent waiting in thread pool queue (CASSANDRA-8398) + * Conditionally update index built status to avoid unnecessary flushes (CASSANDRA-12969) + * cqlsh auto completion: refactor definition of compaction strategy options (CASSANDRA-12946) + * Add support for arithmetic operators (CASSANDRA-11935) + * Add histogram for delay to deliver hints (CASSANDRA-13234) + * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307) + * Changing `max_hint_window_in_ms` at runtime (CASSANDRA-11720) + * Trivial format error in StorageProxy (CASSANDRA-13551) + * Nodetool repair can hang forever if we lose the notification for the repair completing/failing (CASSANDRA-13480) + * Anticompaction can cause noisy log messages (CASSANDRA-13684) + * Switch to client init for sstabledump (CASSANDRA-13683) + * CQLSH: Don't pause when capturing data (CASSANDRA-13743) + * nodetool clearsnapshot requires --all to clear all snapshots (CASSANDRA-13391) + * Correctly count range tombstones in traces and tombstone thresholds (CASSANDRA-8527) + * cqlshrc.sample uses incorrect option for time formatting (CASSANDRA-14243) + + 3.11.3 * SASI tokenizer for simple delimiter based entries (CASSANDRA-14247) * Fix Loss of digits when doing CAST from varint/bigint to decimal (CASSANDRA-14170) http://git-wip-us.apache.org/repos/asf/cassandra/blob/39bcdcd3/src/java/org/apache/cassandra/db/ReadResponse.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ReadResponse.java index 2c06f65,7aa915e..486980d --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@@ -29,11 -36,12 +29,12 @@@ import org.apache.cassandra.db.rows.* import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.thrift.ThriftResultsMerger; ++import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.HashingUtils; public abstract class ReadResponse { @@@ -87,11 -114,31 +88,30 @@@ return "<key " + key + " not found>"; } - private String toDebugString(UnfilteredRowIterator partition, CFMetaData metadata) ++ private String toDebugString(UnfilteredRowIterator partition, TableMetadata metadata) + { + StringBuilder sb = new StringBuilder(); + - sb.append(String.format("[%s.%s] key=%s partition_deletion=%s columns=%s", - metadata.ksName, - metadata.cfName, - metadata.getKeyValidator().getString(partition.partitionKey().getKey()), ++ sb.append(String.format("[%s] key=%s partition_deletion=%s columns=%s", ++ metadata, ++ metadata.partitionKeyType.getString(partition.partitionKey().getKey()), + partition.partitionLevelDeletion(), + partition.columns())); + + if (partition.staticRow() != Rows.EMPTY_STATIC_ROW) + sb.append("\n ").append(partition.staticRow().toString(metadata, true)); + + while (partition.hasNext()) + sb.append("\n ").append(partition.next().toString(metadata, true)); + + return sb.toString(); + } + protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator, ReadCommand command) { - MessageDigest digest = FBUtilities.threadLocalMD5Digest(); - UnfilteredPartitionIterators.digest(command, iterator, digest, command.digestVersion()); - return ByteBuffer.wrap(digest.digest()); + Hasher hasher = HashingUtils.CURRENT_HASH_FUNCTION.newHasher(); + UnfilteredPartitionIterators.digest(iterator, hasher, command.digestVersion()); + return ByteBuffer.wrap(hasher.hash().asBytes()); } private static class DigestResponse extends ReadResponse http://git-wip-us.apache.org/repos/asf/cassandra/blob/39bcdcd3/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java index a1f2b2c,f6ba149..c0c6afd --- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java +++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java @@@ -127,15 -127,21 +127,15 @@@ public class RangeTombstoneBoundMarker return new RangeTombstoneBoundMarker(clustering(), newDeletionTime); } - public void digest(MessageDigest digest) - { - bound.digest(digest); - deletion.digest(digest); - } - - @Override - public void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude) + public void digest(Hasher hasher) { - digest(digest); + bound.digest(hasher); + deletion.digest(hasher); } - public String toString(CFMetaData metadata) + public String toString(TableMetadata metadata) { - return "Marker " + bound.toString(metadata) + '@' + deletion.markedForDeleteAt(); + return String.format("Marker %s@%d/%d", bound.toString(metadata), deletion.markedForDeleteAt(), deletion.localDeletionTime()); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/39bcdcd3/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java index 60b7dab,f183309..6e6c8cd --- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java +++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java @@@ -145,16 -145,25 +145,19 @@@ public class RangeTombstoneBoundaryMark return new RangeTombstoneBoundMarker(openBound(reversed), startDeletion); } - public void digest(MessageDigest digest) - { - bound.digest(digest); - endDeletion.digest(digest); - startDeletion.digest(digest); - } - - @Override - public void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude) + public void digest(Hasher hasher) { - digest(digest); + bound.digest(hasher); + endDeletion.digest(hasher); + startDeletion.digest(hasher); } - public String toString(CFMetaData metadata) + public String toString(TableMetadata metadata) { - return String.format("Marker %s@%d-%d", bound.toString(metadata), endDeletion.markedForDeleteAt(), startDeletion.markedForDeleteAt()); + return String.format("Marker %s@%d/%d-%d/%d", + bound.toString(metadata), + endDeletion.markedForDeleteAt(), endDeletion.localDeletionTime(), + startDeletion.markedForDeleteAt(), startDeletion.localDeletionTime()); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/39bcdcd3/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java index 63bd3ce,0000000..44b6eeb mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java +++ b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java @@@ -1,336 -1,0 +1,351 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.repair; + +import java.util.Arrays; + +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; + +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ClusteringBound; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.LivenessInfo; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RangeTombstone; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.RegularAndStaticColumns; +import org.apache.cassandra.db.Slice; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.BTreeRow; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.RangeTombstoneMarker; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.RowDiffListener; +import org.apache.cassandra.db.rows.Rows; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.TableMetadata; + +public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeListener +{ + private final DecoratedKey partitionKey; + private final RegularAndStaticColumns columns; + private final boolean isReversed; + private final InetAddressAndPort[] sources; + private final ReadCommand command; + + private final PartitionUpdate.Builder[] repairs; + + private final Row.Builder[] currentRows; + private final RowDiffListener diffListener; + + // The partition level deletion for the merge row. + private DeletionTime partitionLevelDeletion; + // When merged has a currently open marker, its time. null otherwise. + private DeletionTime mergedDeletionTime; + // For each source, the time of the current deletion as known by the source. + private final DeletionTime[] sourceDeletionTime; + // For each source, record if there is an open range to send as repair, and from where. + private final ClusteringBound[] markerToRepair; + + private final RepairListener repairListener; + + public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, InetAddressAndPort[] sources, ReadCommand command, RepairListener repairListener) + { + this.partitionKey = partitionKey; + this.columns = columns; + this.isReversed = isReversed; + this.sources = sources; + repairs = new PartitionUpdate.Builder[sources.length]; + currentRows = new Row.Builder[sources.length]; + sourceDeletionTime = new DeletionTime[sources.length]; + markerToRepair = new ClusteringBound[sources.length]; + this.command = command; + this.repairListener = repairListener; + + this.diffListener = new RowDiffListener() + { + public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original) + { + if (merged != null && !merged.equals(original)) + currentRow(i, clustering).addPrimaryKeyLivenessInfo(merged); + } + + public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original) + { + if (merged != null && !merged.equals(original)) + currentRow(i, clustering).addRowDeletion(merged); + } + + public void onComplexDeletion(int i, Clustering clustering, ColumnMetadata column, DeletionTime merged, DeletionTime original) + { + if (merged != null && !merged.equals(original)) + currentRow(i, clustering).addComplexDeletion(column, merged); + } + + public void onCell(int i, Clustering clustering, Cell merged, Cell original) + { + if (merged != null && !merged.equals(original) && isQueried(merged)) + currentRow(i, clustering).addCell(merged); + } + + private boolean isQueried(Cell cell) + { + // When we read, we may have some cell that have been fetched but are not selected by the user. Those cells may + // have empty values as optimization (see CASSANDRA-10655) and hence they should not be included in the read-repair. + // This is fine since those columns are not actually requested by the user and are only present for the sake of CQL + // semantic (making sure we can always distinguish between a row that doesn't exist from one that do exist but has + /// no value for the column requested by the user) and so it won't be unexpected by the user that those columns are + // not repaired. + ColumnMetadata column = cell.column(); + ColumnFilter filter = RowIteratorMergeListener.this.command.columnFilter(); + return column.isComplex() ? filter.fetchedCellIsQueried(column, cell.path()) : filter.fetchedColumnIsQueried(column); + } + }; + } + + private PartitionUpdate.Builder update(int i) + { + if (repairs[i] == null) + repairs[i] = new PartitionUpdate.Builder(command.metadata(), partitionKey, columns, 1); + return repairs[i]; + } + + /** + * The partition level deletion with with which source {@code i} is currently repaired, or + * {@code DeletionTime.LIVE} if the source is not repaired on the partition level deletion (meaning it was + * up to date on it). The output* of this method is only valid after the call to + * {@link #onMergedPartitionLevelDeletion}. + */ + private DeletionTime partitionLevelRepairDeletion(int i) + { + return repairs[i] == null ? DeletionTime.LIVE : repairs[i].partitionLevelDeletion(); + } + + private Row.Builder currentRow(int i, Clustering clustering) + { + if (currentRows[i] == null) + { + currentRows[i] = BTreeRow.sortedBuilder(); + currentRows[i].newRow(clustering); + } + return currentRows[i]; + } + + public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) + { + this.partitionLevelDeletion = mergedDeletion; + for (int i = 0; i < versions.length; i++) + { + if (mergedDeletion.supersedes(versions[i])) + update(i).addPartitionDeletion(mergedDeletion); + } + } + + public void onMergedRows(Row merged, Row[] versions) + { + // If a row was shadowed post merged, it must be by a partition level or range tombstone, and we handle + // those case directly in their respective methods (in other words, it would be inefficient to send a row + // deletion as repair when we know we've already send a partition level or range tombstone that covers it). + if (merged.isEmpty()) + return; + + Rows.diff(diffListener, merged, versions); + for (int i = 0; i < currentRows.length; i++) + { + if (currentRows[i] != null) + update(i).add(currentRows[i].build()); + } + Arrays.fill(currentRows, null); + } + + private DeletionTime currentDeletion() + { + return mergedDeletionTime == null ? partitionLevelDeletion : mergedDeletionTime; + } + + public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) + { + try + { + // The code for merging range tombstones is a tad complex and we had the assertions there triggered + // unexpectedly in a few occasions (CASSANDRA-13237, CASSANDRA-13719). It's hard to get insights + // when that happen without more context that what the assertion errors give us however, hence the + // catch here that basically gather as much as context as reasonable. + internalOnMergedRangeTombstoneMarkers(merged, versions); + } + catch (AssertionError e) + { + // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd + // rather get more info to debug than not. + TableMetadata table = command.metadata(); - String details = String.format("Error merging RTs on %s: merged=%s, versions=%s, sources={%s}", ++ String details = String.format("Error merging RTs on %s: command=%s, reversed=%b, merged=%s, versions=%s, sources={%s}", + table, ++ command.toCQLString(), ++ isReversed, + merged == null ? "null" : merged.toString(table), + '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']', + Arrays.toString(sources)); + throw new AssertionError(details, e); + } + } + + private void internalOnMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) + { + // The current deletion as of dealing with this marker. + DeletionTime currentDeletion = currentDeletion(); + + for (int i = 0; i < versions.length; i++) + { + RangeTombstoneMarker marker = versions[i]; + + // Update what the source now thinks is the current deletion + if (marker != null) + sourceDeletionTime[i] = marker.isOpen(isReversed) ? marker.openDeletionTime(isReversed) : null; + + // If merged == null, some of the source is opening or closing a marker + if (merged == null) + { + // but if it's not this source, move to the next one + if (marker == null) + continue; + + // We have a close and/or open marker for a source, with nothing corresponding in merged. + // Because merged is a superset, this imply that we have a current deletion (being it due to an + // early opening in merged or a partition level deletion) and that this deletion will still be + // active after that point. Further whatever deletion was open or is open by this marker on the + // source, that deletion cannot supersedes the current one. + // + // But while the marker deletion (before and/or after this point) cannot supersede the current + // deletion, we want to know if it's equal to it (both before and after), because in that case + // the source is up to date and we don't want to include repair. + // + // So in practice we have 2 possible case: + // 1) the source was up-to-date on deletion up to that point: then it won't be from that point + // on unless it's a boundary and the new opened deletion time is also equal to the current + // deletion (note that this implies the boundary has the same closing and opening deletion + // time, which should generally not happen, but can due to legacy reading code not avoiding + // this for a while, see CASSANDRA-13237). + // 2) the source wasn't up-to-date on deletion up to that point and it may now be (if it isn't + // we just have nothing to do for that marker). + assert !currentDeletion.isLive() : currentDeletion.toString(); + + // Is the source up to date on deletion? It's up to date if it doesn't have an open RT repair + // nor an "active" partition level deletion (where "active" means that it's greater or equal + // to the current deletion: if the source has a repaired partition deletion lower than the + // current deletion, this means the current deletion is due to a previously open range tombstone, + // and if the source isn't currently repaired for that RT, then it means it's up to date on it). + DeletionTime partitionRepairDeletion = partitionLevelRepairDeletion(i); + if (markerToRepair[i] == null && currentDeletion.supersedes(partitionRepairDeletion)) + { - // Since there is an ongoing merged deletion, the only way we don't have an open repair for - // this source is that it had a range open with the same deletion as current and it's - // closing it. - assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed)) - : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata())); ++ /* ++ * Since there is an ongoing merged deletion, the only two ways we don't have an open repair for ++ * this source are that: ++ * ++ * 1) it had a range open with the same deletion as current marker, and the marker is coming from ++ * a short read protection response - repeating the open RT bound, or ++ * 2) it had a range open with the same deletion as current marker, and the marker is closing it. ++ */ ++ if (!marker.isBoundary() && marker.isOpen(isReversed)) // (1) ++ { ++ assert currentDeletion.equals(marker.openDeletionTime(isReversed)) ++ : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata())); ++ } ++ else // (2) ++ { ++ assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed)) ++ : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata())); ++ } + + // and so unless it's a boundary whose opening deletion time is still equal to the current + // deletion (see comment above for why this can actually happen), we have to repair the source + // from that point on. + if (!(marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed)))) + markerToRepair[i] = marker.closeBound(isReversed).invert(); + } + // In case 2) above, we only have something to do if the source is up-to-date after that point + // (which, since the source isn't up-to-date before that point, means we're opening a new deletion + // that is equal to the current one). + else if (marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed))) + { + closeOpenMarker(i, marker.openBound(isReversed).invert()); + } + } + else + { + // We have a change of current deletion in merged (potentially to/from no deletion at all). + + if (merged.isClose(isReversed)) + { + // We're closing the merged range. If we're recorded that this should be repaird for the + // source, close and add said range to the repair to send. + if (markerToRepair[i] != null) + closeOpenMarker(i, merged.closeBound(isReversed)); + + } + + if (merged.isOpen(isReversed)) + { + // If we're opening a new merged range (or just switching deletion), then unless the source + // is up to date on that deletion (note that we've updated what the source deleteion is + // above), we'll have to sent the range to the source. + DeletionTime newDeletion = merged.openDeletionTime(isReversed); + DeletionTime sourceDeletion = sourceDeletionTime[i]; + if (!newDeletion.equals(sourceDeletion)) + markerToRepair[i] = merged.openBound(isReversed); + } + } + } + + if (merged != null) + mergedDeletionTime = merged.isOpen(isReversed) ? merged.openDeletionTime(isReversed) : null; + } + + private void closeOpenMarker(int i, ClusteringBound close) + { + ClusteringBound open = markerToRepair[i]; + update(i).add(new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), currentDeletion())); + markerToRepair[i] = null; + } + + public void close() + { + RepairListener.PartitionRepair repair = null; + for (int i = 0; i < repairs.length; i++) + { + if (repairs[i] == null) + continue; + + if (repair == null) + { + repair = repairListener.startPartitionRepair(); + } + repair.reportMutation(sources[i], new Mutation(repairs[i].build())); + } + + if (repair != null) + { + repair.finish(); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org