This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch cassandra-4.1 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 663e51d92868cbd045a83d7fa53e373bb28721a0 Merge: 133ad50a84 a00d8fd5ba Author: Caleb Rackliffe <calebrackli...@gmail.com> AuthorDate: Thu Jun 2 12:07:38 2022 -0500 Merge branch 'cassandra-4.0' into cassandra-4.1 CHANGES.txt | 1 + .../org/apache/cassandra/repair/LocalSyncTask.java | 46 ++++----- .../apache/cassandra/streaming/StreamSession.java | 6 ++ .../cassandra/streaming/StreamingChannel.java | 9 ++ .../async/NettyStreamingConnectionFactory.java | 9 ++ .../async/StreamingMultiplexedChannel.java | 60 ++++++----- .../cassandra/tools/BulkLoadConnectionFactory.java | 21 +++- .../distributed/test/RepairErrorsTest.java | 114 +++++++++++++++++++-- 8 files changed, 208 insertions(+), 58 deletions(-) diff --cc CHANGES.txt index 07ccdf23a8,31c62caa57..cfb7bcb41a --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,182 -1,7 +1,183 @@@ -4.0.5 +4.1-alpha2 + * Remove expired snapshots of dropped tables after restart (CASSANDRA-17619) +Merged from 4.0: + * Ensure FileStreamTask cannot compromise shared channel proxy for system table when interrupted (CASSANDRA-17663) * silence benign SslClosedEngineException (CASSANDRA-17565) Merged from 3.11: +Merged from 3.0: + + +4.1-alpha1 + * Handle config parameters upper bound on startup; Fix auto_snapshot_ttl and paxos_purge_grace_period min unit validations (CASSANDRA-17571) + * Fix leak of non-standard Java types in our Exceptions as clients using JMX are unable to handle them. + Remove useless validation that leads to unnecessary additional read of cassandra.yaml on startup (CASSANDRA-17638) + * Fix repair_request_timeout_in_ms and remove paxos_auto_repair_threshold_mb (CASSANDRA-17557) + * Incremental repair leaks SomeRepairFailedException after switch away from flatMap (CASSANDRA-17620) + * StorageService read threshold get methods throw NullPointerException due to not handling null configs (CASSANDRA-17593) + * Rename truncate_drop guardrail to drop_truncate_table (CASSANDRA-17592) + * nodetool enablefullquerylog can NPE when directory has no files (CASSANDRA-17595) + * Add auto_snapshot_ttl configuration (CASSANDRA-16790) + * List snapshots of dropped tables (CASSANDRA-16843) + * Add information whether sstables are dropped to SchemaChangeListener (CASSANDRA-17582) + * Add a pluggable memtable API (CEP-11 / CASSANDRA-17034) + * Save sstable id as string in activity table (CASSANDRA-17585) + * Implement startup check to prevent Cassandra to potentially spread zombie data (CASSANDRA-17180) + * Allow failing startup on duplicate config keys (CASSANDRA-17379) + * Migrate threshold for minimum keyspace replication factor to guardrails (CASSANDRA-17212) + * Add guardrail to disallow TRUNCATE and DROP TABLE commands (CASSANDRA-17558) + * Add plugin support for CQLSH (CASSANDRA-16456) + * Add guardrail to disallow querying with ALLOW FILTERING (CASSANDRA-17370) + * Enhance SnakeYAML properties to be reusable outside of YAML parsing, support camel case conversion to snake case, and add support to ignore properties (CASSANDRA-17166) + * nodetool compact should support using a key string to find the range to avoid operators having to manually do this (CASSANDRA-17537) + * Add guardrail for data disk usage (CASSANDRA-17150) + * Tool to list data paths of existing tables (CASSANDRA-17568) + * Migrate track_warnings to more standard naming conventions and use latest configuration types rather than long (CASSANDRA-17560) + * Add support for CONTAINS and CONTAINS KEY in conditional UPDATE and DELETE statement (CASSANDRA-10537) + * Migrate advanced config parameters to the new Config types (CASSANDRA-17431) + * Make null to be meaning disabled and leave 0 as a valid value for permissions_update_interval, roles_update_interval, credentials_update_interval (CASSANDRA-17431) + * Fix typo in Config annotation (CASSANDRA-17431) + * Made Converters type safe and fixed a few cases where converters used the wrong type (CASSANDRA-17431) + * Fix null bug in DataStorageSpec and DurationSpec and require units to be added when providing 0 value (CASSANDRA-17431) + * Shutdown ScheduledExecutors as part of node drainage (CASSANDRA-17493) + * Provide JMX endpoint to allow transient logging of blocking read repairs (CASSANDRA-17471) + * Add guardrail for GROUP BY queries (CASSANDRA-17509) + * make pylib PEP and pylint compliant (CASSANDRA-17546) + * Add support for vnodes in jvm-dtest (CASSANDRA-17332) + * Remove guardrails global enable flag (CASSANDRA-17499) + * Clients using JMX are unable to handle non-standard java types but we leak this into our interfaces (CASSANDRA-17527) + * Remove stress server functionality (CASSANDRA-17535) + * Reduce histogram snapshot long[] allocation overhead during speculative read and write threshold updates (CASSANDRA-17523) + * Add guardrail for creation of secondary indexes (CASSANDRA-17498) + * Add guardrail to disallow creation of uncompressed tables (CASSANDRA-17504) + * Add guardrail to disallow creation of new COMPACT STORAGE tables (CASSANDRA-17522) + * repair vtables should expose a completed field due to lack of filtering options in CQL (CASSANDRA-17520) + * remove outdated code from cqlsh (CASSANDRA-17490) + * remove support for deprecated version specific TLS in Python 3.6 (CASSANDRA-17365) + * Add support for IF EXISTS and IF NOT EXISTS in ALTER statements (CASSANDRA-16916) + * resolve several pylint issues in cqlsh.py and pylib (CASSANDRA-17480) + * Streaming sessions longer than 3 minutes fail with timeout (CASSANDRA-17510) + * Add ability to track state in repair (CASSANDRA-15399) + * Remove unused 'parse' module (CASSANDRA-17484) + * change six functions in cqlshlib to native Python 3 (CASSANDRA-17417) + * reduce hot-path object allocations required to record local/remote requests against the client request metrics (CASSANDRA-17424) + * Disallow removing DC from system_auth while nodes are active in the DC (CASSANDRA-17478) + * Add guardrail for the number of fields per UDT (CASSANDRA-17385) + * Allow users to change cqlsh history location using env variable (CASSANDRA-17448) + * Add required -f option to use nodetool verify and standalone sstableverify (CASSANDRA-17017) + * Add support for UUID based sstable generation identifiers (CASSANDRA-17048) + * Log largest memtable flush at info instead of debug (CASSANDRA-17472) + * Add native transport rate limiter options to example cassandra.yaml, and expose metric for dispatch rate (CASSANDRA-17423) + * Add diagnostic events for guardrails (CASSANDRA-17197) + * Pre hashed passwords in CQL (CASSANDRA-17334) + * Increase cqlsh version (CASSANDRA-17432) + * Update SUPPORTED_UPGRADE_PATHS to include 3.0 and 3.x to 4.1 paths and remove obsolete tests (CASSANDRA-17362) + * Support DELETE in CQLSSTableWriter (CASSANDRA-14797) + * Failed inbound internode authentication failures generate ugly warning with stack trace (CASSANDRA-17068) + * Expose gossip information in system_views.gossip_info virtual table (CASSANDRA-17002) + * Add guardrails for collection items and size (CASSANDRA-17153) + * Improve guardrails messages (CASSANDRA-17430) + * Remove all usages of junit.framework and ban them via Checkstyle (CASSANDRA-17316) + * Add guardrails for read/write consistency levels (CASSANDRA-17188) + * Add guardrail for SELECT IN terms and their cartesian product (CASSANDRA-17187) + * remove unused imports in cqlsh.py and cqlshlib (CASSANDRA-17413) + * deprecate property windows_timer_interval (CASSANDRA-17404) + * Expose streaming as a vtable (CASSANDRA-17390) + * Expose all client options via system_views.clients and nodetool clientstats (CASSANDRA-16378) + * Make startup checks configurable (CASSANDRA-17220) + * Add guardrail for number of partition keys on IN queries (CASSANDRA-17186) + * update Python test framework from nose to pytest (CASSANDRA-17293) + * Fix improper CDC commit log segments deletion in non-blocking mode (CASSANDRA-17233) + * Add support for string concatenations through the + operator (CASSANDRA-17190) + * Limit the maximum hints size per host (CASSANDRA-17142) + * Add a virtual table for exposing batch metrics (CASSANDRA-17225) + * Flatten guardrails config (CASSANDRA-17353) + * Instance failed to start up due to NPE in StartupClusterConnectivityChecker (CASSANDRA-17347) + * add the shorter version of version flag (-v) in cqlsh (CASSANDRA-17236) + * Make vtables accessible via internode messaging (CASSANDRA-17295) + * Add support for PEM based key material for SSL (CASSANDRA-17031) + * Standardize storage configuration parameters' names. Support unit suffixes. (CASSANDRA-15234) + * Remove support for Windows (CASSANDRA-16956) + * Runtime-configurable YAML option to prohibit USE statements (CASSANDRA-17318) + * When streaming sees a ClosedChannelException this triggers the disk failure policy (CASSANDRA-17116) + * Add a virtual table for exposing prepared statements metrics (CASSANDRA-17224) + * Remove python 2.x support from cqlsh (CASSANDRA-17242) + * Prewarm role and credential caches to avoid timeouts at startup (CASSANDRA-16958) + * Make capacity/validity/updateinterval/activeupdate for Auth Caches configurable via nodetool (CASSANDRA-17063) + * Added startup check for read_ahead_kb setting (CASSANDRA-16436) + * Avoid unecessary array allocations and initializations when performing query checks (CASSANDRA-17209) + * Add guardrail for list operations that require read before write (CASSANDRA-17154) + * Migrate thresholds for number of keyspaces and tables to guardrails (CASSANDRA-17195) + * Remove self-reference in SSTableTidier (CASSANDRA-17205) + * Add guardrail for query page size (CASSANDRA-17189) + * Allow column_index_size_in_kb to be configurable through nodetool (CASSANDRA-17121) + * Emit a metric for number of local read and write calls + * Add non-blocking mode for CDC writes (CASSANDRA-17001) + * Add guardrails framework (CASSANDRA-17147) + * Harden resource management on SSTable components to prevent future leaks (CASSANDRA-17174) + * Make nodes more resilient to local unrelated files during startup (CASSANDRA-17082) + * repair prepare message would produce a wrong error message if network timeout happened rather than reply wait timeout (CASSANDRA-16992) + * Log queries that fail on timeout or unavailable errors up to once per minute by default (CASSANDRA-17159) + * Refactor normal/preview/IR repair to standardize repair cleanup and error handling of failed RepairJobs (CASSANDRA-17069) + * Log missing peers in StartupClusterConnectivityChecker (CASSANDRA-17130) + * Introduce separate rate limiting settings for entire SSTable streaming (CASSANDRA-17065) + * Implement Virtual Tables for Auth Caches (CASSANDRA-16914) + * Actively update auth cache in the background (CASSANDRA-16957) + * Add unix time conversion functions (CASSANDRA-17029) + * JVMStabilityInspector.forceHeapSpaceOomMaybe should handle all non-heap OOMs rather than only supporting direct only (CASSANDRA-17128) + * Forbid other Future implementations with checkstyle (CASSANDRA-17055) + * commit log was switched from non-daemon to daemon threads, which causes the JVM to exit in some case as no non-daemon threads are active (CASSANDRA-17085) + * Add a Denylist to block reads and writes on specific partition keys (CASSANDRA-12106) + * v4+ protocol did not clean up client warnings, which caused leaking the state (CASSANDRA-17054) + * Remove duplicate toCQLString in ReadCommand (CASSANDRA-17023) + * Ensure hint window is persistent across restarts of a node (CASSANDRA-14309) + * Allow to GRANT or REVOKE multiple permissions in a single statement (CASSANDRA-17030) + * Allow to grant permission for all tables in a keyspace (CASSANDRA-17027) + * Log time spent writing keys during compaction (CASSANDRA-17037) + * Make nodetool compactionstats and sstable_tasks consistent (CASSANDRA-16976) + * Add metrics and logging around index summary redistribution (CASSANDRA-17036) + * Add configuration options for minimum allowable replication factor and default replication factor (CASSANDRA-14557) + * Expose information about stored hints via a nodetool command and a virtual table (CASSANDRA-14795) + * Add broadcast_rpc_address to system.local (CASSANDRA-11181) + * Add support for type casting in WHERE clause components and in the values of INSERT/UPDATE statements (CASSANDRA-14337) + * add credentials file support to CQLSH (CASSANDRA-16983) + * Skip remaining bytes in the Envelope buffer when a ProtocolException is thrown to avoid double decoding (CASSANDRA-17026) + * Allow reverse iteration of resources during permissions checking (CASSANDRA-17016) + * Add feature to verify correct ownership of attached locations on disk at startup (CASSANDRA-16879) + * Make SSLContext creation pluggable/extensible (CASSANDRA-16666) + * Add soft/hard limits to local reads to protect against reading too much data in a single query (CASSANDRA-16896) + * Avoid token cache invalidation for removing a non-member node (CASSANDRA-15290) + * Allow configuration of consistency levels on auth operations (CASSANDRA-12988) + * Add number of sstables in a compaction to compactionstats output (CASSANDRA-16844) + * Upgrade Caffeine to 2.9.2 (CASSANDRA-15153) + * Allow DELETE and TRUNCATE to work on Virtual Tables if the implementation allows it (CASSANDRA-16806) + * Include SASI components to snapshots (CASSANDRA-15134) + * Fix missed wait latencies in the output of `nodetool tpstats -F` (CASSANDRA-16938) + * Reduce native transport max frame size to 16MB (CASSANDRA-16886) + * Add support for filtering using IN restrictions (CASSANDRA-14344) + * Provide a nodetool command to invalidate auth caches (CASSANDRA-16404) + * Catch read repair timeout exceptions and add metric (CASSANDRA-16880) + * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies (CASSANDRA-16854) + * Add client warnings and abort to tombstone and coordinator reads which go past a low/high watermark (CASSANDRA-16850) + * Add TTL support to nodetool snapshots (CASSANDRA-16789) + * Allow CommitLogSegmentReader to optionally skip sync marker CRC checks (CASSANDRA-16842) + * allow blocking IPs from updating metrics about traffic (CASSANDRA-16859) + * Request-Based Native Transport Rate-Limiting (CASSANDRA-16663) + * Implement nodetool getauditlog command (CASSANDRA-16725) + * Clean up repair code (CASSANDRA-13720) + * Background schedule to clean up orphaned hints files (CASSANDRA-16815) + * Modify SecondaryIndexManager#indexPartition() to retrieve only columns for which indexes are actually being built (CASSANDRA-16776) + * Batch the token metadata update to improve the speed (CASSANDRA-15291) + * Reduce the log level on "expected" repair exceptions (CASSANDRA-16775) + * Make JMXTimer expose attributes using consistent time unit (CASSANDRA-16760) + * Remove check on gossip status from DynamicEndpointSnitch::updateScores (CASSANDRA-11671) + * Fix AbstractReadQuery::toCQLString not returning valid CQL (CASSANDRA-16510) + * Log when compacting many tombstones (CASSANDRA-16780) + * Display bytes per level in tablestats for LCS tables (CASSANDRA-16799) + * Add isolated flush timer to CommitLogMetrics and ensure writes correspond to single WaitingOnCommit data points (CASSANDRA-16701) + * GossiperTest.testHasVersion3Nodes didn't take into account trunk version changes, fixed to rely on latest version (CASSANDRA-16651) + * Update JNA library to 5.9.0 and snappy-java to version 1.1.8.4 (CASSANDRA-17040) +Merged from 4.0: +Merged from 3.11: Merged from 3.0: * Fix issue where frozen maps may not be serialized in the correct order (CASSANDRA-17623) * Suppress CVE-2022-24823 (CASSANDRA-17633) diff --cc src/java/org/apache/cassandra/repair/LocalSyncTask.java index 71cec282ad,99315754bd..28a6accff7 --- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java +++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java @@@ -19,6 -19,9 +19,7 @@@ package org.apache.cassandra.repair import java.util.Collections; import java.util.List; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; + import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@@ -40,7 -43,6 +41,9 @@@ import org.apache.cassandra.streaming.S import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.TimeUUID; ++import org.apache.cassandra.utils.concurrent.AsyncPromise; ++import org.apache.cassandra.utils.concurrent.Promise; /** * LocalSyncTask performs streaming between local(coordinator) node and remote replica. @@@ -58,11 -60,11 +61,11 @@@ public class LocalSyncTask extends Sync @VisibleForTesting public final boolean transferRanges; - private boolean active = true; - private StreamPlan streamPlan; + private final AtomicBoolean active = new AtomicBoolean(true); - private final CompletableFuture<StreamPlan> planFuture = new CompletableFuture<>(); ++ private final Promise<StreamPlan> planPromise = new AsyncPromise<>(); public LocalSyncTask(RepairJobDesc desc, InetAddressAndPort local, InetAddressAndPort remote, - List<Range<Token>> diff, UUID pendingRepair, + List<Range<Token>> diff, TimeUUID pendingRepair, boolean requestRanges, boolean transferRanges, PreviewKind previewKind) { super(desc, local, remote, diff, previewKind); @@@ -115,8 -117,9 +118,9 @@@ logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); Tracing.traceRepair(message); - streamPlan = createStreamPlan(); - streamPlan.execute(); + StreamPlan plan = createStreamPlan(); + plan.execute(); - planFuture.complete(plan); ++ planPromise.setSuccess(plan); } } @@@ -169,12 -171,11 +172,11 @@@ } @Override - public synchronized void onFailure(Throwable t) + public void onFailure(Throwable t) { - if (active) + if (active.compareAndSet(true, false)) { - active = false; - setException(t); + tryFailure(t); finished(); } } @@@ -191,22 -192,12 +193,12 @@@ } @Override - public synchronized void abort() + public void abort() { - if (active) - planFuture.whenComplete((plan, cause) -> ++ planPromise.addCallback((plan, cause) -> { - if (streamPlan == null) - { - active = false; - String message = String.format("Sync for session %s between %s and %s on %s aborted before starting", - desc.sessionId, nodePair.coordinator, nodePair.peer, desc.columnFamily); - logger.debug("{} {}", previewKind.logPrefix(desc.sessionId), message); - trySuccess(stat); - } - else - { - streamPlan.getCoordinator().getAllStreamSessions().forEach(StreamSession::abort); - } - } + assert plan != null : "StreamPlan future should never be completed exceptionally"; + plan.getCoordinator().getAllStreamSessions().forEach(StreamSession::abort); + }); } } diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java index 595890def8,b2739b331a..2036262e54 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@@ -1084,42 -1037,18 +1084,48 @@@ public class StreamSession implements I public void onClose(InetAddressAndPort from); } + public static String createLogTag(StreamSession session) + { + return createLogTag(session, (Object) null); + } + + public static String createLogTag(StreamSession session, StreamingChannel channel) + { + return createLogTag(session, channel == null ? null : channel.id()); + } + + public static String createLogTag(StreamSession session, Channel channel) + { + return createLogTag(session, channel == null ? null : channel.id()); + } + + public static String createLogTag(StreamSession session, Object channelId) + { + StringBuilder sb = new StringBuilder(64); + sb.append("[Stream"); + + if (session != null) + sb.append(" #").append(session.planId()); + + if (channelId != null) + sb.append(" channel: ").append(channelId); + + sb.append(']'); + return sb.toString(); + } + public synchronized void abort() { + if (state.isFinalState()) + { + logger.debug("[Stream #{}] Stream session with peer {} is already in a final state on abort.", planId(), peer); + return; + } + logger.info("[Stream #{}] Aborting stream session with peer {}...", planId(), peer); - if (getMessageSender().connected()) - getMessageSender().sendMessage(new SessionFailedMessage()); + if (channel.connected()) + channel.sendControlMessage(new SessionFailedMessage()); try { diff --cc src/java/org/apache/cassandra/streaming/StreamingChannel.java index f49089c48a,0000000000..a638638bc2 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/streaming/StreamingChannel.java +++ b/src/java/org/apache/cassandra/streaming/StreamingChannel.java @@@ -1,78 -1,0 +1,87 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.streaming; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.function.IntFunction; + +import io.netty.util.concurrent.Future; //checkstyle: permit this import +import org.apache.cassandra.streaming.async.NettyStreamingConnectionFactory; +import org.apache.cassandra.utils.Shared; + +import static org.apache.cassandra.utils.Shared.Recursive.INTERFACES; +import static org.apache.cassandra.utils.Shared.Scope.SIMULATION; + +@Shared(scope = SIMULATION, inner = INTERFACES) +public interface StreamingChannel +{ + public interface Factory + { + public static class Global + { + private static StreamingChannel.Factory FACTORY = new NettyStreamingConnectionFactory(); + public static StreamingChannel.Factory streamingFactory() + { + return FACTORY; + } + + public static void unsafeSet(StreamingChannel.Factory factory) + { + FACTORY = factory; + } + } + + StreamingChannel create(InetSocketAddress to, int messagingVersion, Kind kind) throws IOException; ++ ++ default StreamingChannel create(InetSocketAddress to, ++ InetSocketAddress preferred, ++ int messagingVersion, ++ StreamingChannel.Kind kind) throws IOException ++ { ++ // Implementations can decide whether or not to do something with the preferred address. ++ return create(to, messagingVersion, kind); ++ } + } + + public enum Kind { CONTROL, FILE } + + public interface Send + { + void send(IntFunction<StreamingDataOutputPlus> outSupplier) throws IOException; + } + + Object id(); + String description(); + + InetSocketAddress peer(); + InetSocketAddress connectedTo(); + boolean connected(); + + StreamingDataInputPlus in(); + + /** + * until closed, cannot invoke {@link #send(Send)} + */ + StreamingDataOutputPlus acquireOut(); + Future<?> send(Send send) throws IOException; + + Future<?> close(); + void onClose(Runnable runOnClose); +} diff --cc src/java/org/apache/cassandra/streaming/async/NettyStreamingConnectionFactory.java index 946df59892,0000000000..6a57e395e4 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/streaming/async/NettyStreamingConnectionFactory.java +++ b/src/java/org/apache/cassandra/streaming/async/NettyStreamingConnectionFactory.java @@@ -1,76 -1,0 +1,85 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.streaming.async; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import com.google.common.annotations.VisibleForTesting; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoop; +import io.netty.util.concurrent.Future; // checkstyle: permit this import +import org.apache.cassandra.net.ConnectionCategory; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.OutboundConnectionInitiator.Result; +import org.apache.cassandra.net.OutboundConnectionInitiator.Result.StreamingSuccess; +import org.apache.cassandra.net.OutboundConnectionSettings; +import org.apache.cassandra.streaming.StreamingChannel; + +import static org.apache.cassandra.locator.InetAddressAndPort.getByAddress; +import static org.apache.cassandra.net.OutboundConnectionInitiator.initiateStreaming; + +public class NettyStreamingConnectionFactory implements StreamingChannel.Factory +{ + @VisibleForTesting + public static int MAX_CONNECT_ATTEMPTS = 3; + + public static NettyStreamingChannel connect(OutboundConnectionSettings template, int messagingVersion, StreamingChannel.Kind kind) throws IOException + { + EventLoop eventLoop = MessagingService.instance().socketFactory.outboundStreamingGroup().next(); + + int attempts = 0; + while (true) + { + Future<Result<StreamingSuccess>> result = initiateStreaming(eventLoop, template.withDefaults(ConnectionCategory.STREAMING), messagingVersion); + result.awaitUninterruptibly(); // initiate has its own timeout, so this is "guaranteed" to return relatively promptly + if (result.isSuccess()) + { + Channel channel = result.getNow().success().channel; + NettyStreamingChannel streamingChannel = new NettyStreamingChannel(messagingVersion, channel, kind); + if (kind == StreamingChannel.Kind.CONTROL) + { + ChannelPipeline pipeline = channel.pipeline(); + pipeline.addLast("stream", streamingChannel); + } + return streamingChannel; + } + + if (++attempts == MAX_CONNECT_ATTEMPTS) + throw new IOException("failed to connect to " + template.to + " for streaming data", result.cause()); + } + } + + @Override + public StreamingChannel create(InetSocketAddress to, int messagingVersion, StreamingChannel.Kind kind) throws IOException + { + return connect(new OutboundConnectionSettings(getByAddress(to)), messagingVersion, kind); + } ++ ++ @Override ++ public StreamingChannel create(InetSocketAddress to, ++ InetSocketAddress preferred, ++ int messagingVersion, ++ StreamingChannel.Kind kind) throws IOException ++ { ++ return connect(new OutboundConnectionSettings(getByAddress(to), getByAddress(preferred)), messagingVersion, kind); ++ } +} diff --cc src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java index 4f3a443515,0000000000..c2e551edb6 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java +++ b/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java @@@ -1,435 -1,0 +1,449 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.streaming.async; + +import java.io.IOError; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.ClosedByInterruptException; +import java.util.Collection; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; + +import javax.annotation.Nullable; + - import org.apache.cassandra.locator.InetAddressAndPort; - import org.apache.cassandra.streaming.StreamDeserializingTask; - import org.apache.cassandra.streaming.StreamingChannel; - import org.apache.cassandra.streaming.StreamingDataOutputPlus; - import org.apache.cassandra.utils.concurrent.ImmediateFuture; - import org.apache.cassandra.utils.concurrent.Semaphore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.util.concurrent.Future; // checkstyle: permit this import +import org.apache.cassandra.concurrent.ExecutorPlus; ++import org.apache.cassandra.db.SystemKeyspace; ++import org.apache.cassandra.locator.InetAddressAndPort; ++import org.apache.cassandra.streaming.StreamDeserializingTask; ++import org.apache.cassandra.streaming.StreamingChannel; ++import org.apache.cassandra.streaming.StreamingDataOutputPlus; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.IncomingStreamMessage; +import org.apache.cassandra.streaming.messages.OutgoingStreamMessage; +import org.apache.cassandra.streaming.messages.StreamMessage; ++import org.apache.cassandra.utils.concurrent.ImmediateFuture; ++import org.apache.cassandra.utils.concurrent.Semaphore; +import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; + +import static com.google.common.base.Throwables.getRootCause; +import static java.lang.Integer.parseInt; +import static java.lang.String.format; +import static java.lang.System.getProperty; +import static java.lang.Thread.currentThread; +import static java.util.concurrent.TimeUnit.*; ++ +import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; +import static org.apache.cassandra.config.Config.PROPERTY_PREFIX; +import static org.apache.cassandra.streaming.StreamSession.createLogTag; +import static org.apache.cassandra.streaming.messages.StreamMessage.serialize; +import static org.apache.cassandra.streaming.messages.StreamMessage.serializedSize; +import static org.apache.cassandra.utils.Clock.Global.nanoTime; +import static org.apache.cassandra.utils.FBUtilities.getAvailableProcessors; +import static org.apache.cassandra.utils.JVMStabilityInspector.inspectThrowable; +import static org.apache.cassandra.utils.concurrent.BlockingQueues.newBlockingQueue; +import static org.apache.cassandra.utils.concurrent.Semaphore.newFairSemaphore; + - import static org.apache.cassandra.utils.Clock.Global.nanoTime; - +/** + * Responsible for sending {@link StreamMessage}s to a given peer. We manage an array of netty {@link Channel}s + * for sending {@link OutgoingStreamMessage} instances; all other {@link StreamMessage} types are sent via + * a special control channel. The reason for this is to treat those messages carefully and not let them get stuck + * behind a stream transfer. + * + * One of the challenges when sending streams is we might need to delay shipping the stream if: + * + * - we've exceeded our network I/O use due to rate limiting (at the cassandra level) + * - the receiver isn't keeping up, which causes the local TCP socket buffer to not empty, which causes epoll writes to not + * move any bytes to the socket, which causes buffers to stick around in user-land (a/k/a cassandra) memory. + * + * When those conditions occur, it's easy enough to reschedule processing the stream once the resources pick up + * (we acquire the permits from the rate limiter, or the socket drains). However, we need to ensure that + * no other messages are submitted to the same channel while the current stream is still being processed. + */ +public class StreamingMultiplexedChannel +{ + private static final Logger logger = LoggerFactory.getLogger(StreamingMultiplexedChannel.class); + + private static final int DEFAULT_MAX_PARALLEL_TRANSFERS = getAvailableProcessors(); + private static final int MAX_PARALLEL_TRANSFERS = parseInt(getProperty(PROPERTY_PREFIX + "streaming.session.parallelTransfers", Integer.toString(DEFAULT_MAX_PARALLEL_TRANSFERS))); + + // a simple mechansim for allowing a degree of fairness across multiple sessions + private static final Semaphore fileTransferSemaphore = newFairSemaphore(DEFAULT_MAX_PARALLEL_TRANSFERS); + + private final StreamingChannel.Factory factory; + private final InetAddressAndPort to; + private final StreamSession session; + private final int messagingVersion; + + private volatile boolean closed; + + /** + * A special {@link Channel} for sending non-stream streaming messages, basically anything that isn't an + * {@link OutgoingStreamMessage} (or an {@link IncomingStreamMessage}, but a node doesn't send that, it's only received). + */ + private volatile StreamingChannel controlChannel; + + // note: this really doesn't need to be a LBQ, just something that's thread safe + private final Collection<ScheduledFuture<?>> channelKeepAlives = newBlockingQueue(); + + private final ExecutorPlus fileTransferExecutor; + + /** + * A mapping of each {@link #fileTransferExecutor} thread to a channel that can be written to (on that thread). + */ + private final ConcurrentMap<Thread, StreamingChannel> threadToChannelMap = new ConcurrentHashMap<>(); + + public StreamingMultiplexedChannel(StreamSession session, StreamingChannel.Factory factory, InetAddressAndPort to, @Nullable StreamingChannel controlChannel, int messagingVersion) + { + this.session = session; + this.factory = factory; + this.to = to; + this.messagingVersion = messagingVersion; + this.controlChannel = controlChannel; + + String name = session.peer.toString().replace(':', '.'); + fileTransferExecutor = executorFactory() + .configurePooled("NettyStreaming-Outbound-" + name, MAX_PARALLEL_TRANSFERS) + .withKeepAlive(1L, SECONDS).build(); + } + + + + public InetAddressAndPort peer() + { + return to; + } + + public InetSocketAddress connectedTo() + { + return controlChannel == null ? to : controlChannel.connectedTo(); + } + + /** + * Used by initiator to setup control message channel connecting to follower + */ + private void setupControlMessageChannel() throws IOException + { + if (controlChannel == null) + { + /* + * Inbound handlers are needed: + * a) for initiator's control channel(the first outbound channel) to receive follower's message. + * b) for streaming receiver (note: both initiator and follower can receive streaming files) to reveive files, + * in {@link Handler#setupStreamingPipeline} + */ - controlChannel = createChannel(StreamingChannel.Kind.CONTROL); ++ controlChannel = createControlChannel(); + } + } + - private StreamingChannel createChannel(StreamingChannel.Kind kind) throws IOException ++ private StreamingChannel createControlChannel() throws IOException + { + logger.debug("Creating stream session to {} as {}", to, session.isFollower() ? "follower" : "initiator"); + - StreamingChannel channel = factory.create(to, messagingVersion, kind); - if (kind == StreamingChannel.Kind.CONTROL) - { - executorFactory().startThread(String.format("Stream-Deserializer-%s-%s", to.toString(), channel.id()), - new StreamDeserializingTask(session, channel, messagingVersion)); - session.attachInbound(channel); - } ++ StreamingChannel channel = factory.create(to, messagingVersion, StreamingChannel.Kind.CONTROL); ++ executorFactory().startThread(String.format("Stream-Deserializer-%s-%s", to.toString(), channel.id()), ++ new StreamDeserializingTask(session, channel, messagingVersion)); ++ session.attachInbound(channel); + session.attachOutbound(channel); + - logger.debug("Creating {}", channel.description()); ++ logger.debug("Creating control {}", channel.description()); ++ return channel; ++ } ++ ++ private StreamingChannel createFileChannel(InetAddressAndPort connectTo) throws IOException ++ { ++ logger.debug("Creating stream session to {} as {}", to, session.isFollower() ? "follower" : "initiator"); ++ ++ StreamingChannel channel = factory.create(to, connectTo, messagingVersion, StreamingChannel.Kind.FILE); ++ session.attachOutbound(channel); ++ ++ logger.debug("Creating file {}", channel.description()); + return channel; + } + + public Future<?> sendControlMessage(StreamMessage message) + { + try + { + setupControlMessageChannel(); + return sendMessage(controlChannel, message); + } + catch (Exception e) + { + close(); + session.onError(e); + return ImmediateFuture.failure(e); + } + + } + public Future<?> sendMessage(StreamingChannel channel, StreamMessage message) + { + if (closed) + throw new RuntimeException("stream has been closed, cannot send " + message); + + if (message instanceof OutgoingStreamMessage) + { + if (session.isPreview()) + throw new RuntimeException("Cannot send stream data messages for preview streaming sessions"); + if (logger.isDebugEnabled()) + logger.debug("{} Sending {}", createLogTag(session), message); - return fileTransferExecutor.submit(new FileStreamTask((OutgoingStreamMessage)message)); ++ ++ InetAddressAndPort connectTo = SystemKeyspace.getPreferredIP(to); ++ return fileTransferExecutor.submit(new FileStreamTask((OutgoingStreamMessage) message, connectTo)); + } + + try + { + Future<?> promise = channel.send(outSupplier -> { + // we anticipate that the control messages are rather small, so allocating a ByteBuf shouldn't blow out of memory. + long messageSize = serializedSize(message, messagingVersion); + if (messageSize > 1 << 30) + { + throw new IllegalStateException(format("%s something is seriously wrong with the calculated stream control message's size: %d bytes, type is %s", + createLogTag(session, controlChannel.id()), messageSize, message.type)); + } + try (StreamingDataOutputPlus out = outSupplier.apply((int) messageSize)) + { + StreamMessage.serialize(message, out, messagingVersion, session); + } + }); + promise.addListener(future -> onMessageComplete(future, message)); + return promise; + } + catch (Exception e) + { + close(); + session.onError(e); + return ImmediateFuture.failure(e); + } + } + + /** + * Decides what to do after a {@link StreamMessage} is processed. + * + * Note: this is called from the netty event loop. + * + * @return null if the message was processed sucessfully; else, a {@link java.util.concurrent.Future} to indicate + * the status of aborting any remaining tasks in the session. + */ + Future<?> onMessageComplete(Future<?> future, StreamMessage msg) + { + Throwable cause = future.cause(); + if (cause == null) + return null; + + Channel channel = future instanceof ChannelFuture ? ((ChannelFuture)future).channel() : null; + logger.error("{} failed to send a stream message/data to peer {}: msg = {}", + createLogTag(session, channel), to, msg, future.cause()); + + // StreamSession will invoke close(), but we have to mark this sender as closed so the session doesn't try + // to send any failure messages + return session.onError(cause); + } + + class FileStreamTask implements Runnable + { + /** + * Time interval, in minutes, to wait between logging a message indicating that we're waiting on a semaphore + * permit to become available. + */ + private static final int SEMAPHORE_UNAVAILABLE_LOG_INTERVAL = 3; + + /** + * Even though we expect only an {@link OutgoingStreamMessage} at runtime, the type here is {@link StreamMessage} + * to facilitate simpler testing. + */ + private final StreamMessage msg; + - FileStreamTask(OutgoingStreamMessage ofm) ++ private final InetAddressAndPort connectTo; ++ ++ FileStreamTask(OutgoingStreamMessage ofm, InetAddressAndPort connectTo) + { + this.msg = ofm; ++ this.connectTo = connectTo; + } + + /** + * For testing purposes + */ + FileStreamTask(StreamMessage msg) + { + this.msg = msg; ++ this.connectTo = null; + } + + @Override + public void run() + { + if (!acquirePermit(SEMAPHORE_UNAVAILABLE_LOG_INTERVAL)) + return; + + StreamingChannel channel = null; + try + { - channel = getOrCreateChannel(); ++ channel = getOrCreateFileChannel(connectTo); + + // close the DataOutputStreamPlus as we're done with it - but don't close the channel + try (StreamingDataOutputPlus out = channel.acquireOut()) + { + serialize(msg, out, messagingVersion, session); + } + } + catch (Exception e) + { + session.onError(e); + } + catch (Throwable t) + { + if (closed && getRootCause(t) instanceof ClosedByInterruptException && fileTransferExecutor.isShutdown()) + { + logger.debug("{} Streaming channel was closed due to the executor pool being shutdown", createLogTag(session, channel)); + } + else + { + inspectThrowable(t); + if (!session.state().isFinalState()) + session.onError(t); + } + } + finally + { + fileTransferSemaphore.release(1); + } + } + + boolean acquirePermit(int logInterval) + { + long logIntervalNanos = MINUTES.toNanos(logInterval); + long timeOfLastLogging = nanoTime(); + while (true) + { + if (closed) + return false; + try + { + if (fileTransferSemaphore.tryAcquire(1, 1, SECONDS)) + return true; + + // log a helpful message to operators in case they are wondering why a given session might not be making progress. + long now = nanoTime(); + if (now - timeOfLastLogging > logIntervalNanos) + { + timeOfLastLogging = now; + OutgoingStreamMessage ofm = (OutgoingStreamMessage)msg; + + if (logger.isInfoEnabled()) + logger.info("{} waiting to acquire a permit to begin streaming {}. This message logs every {} minutes", + createLogTag(session), ofm.getName(), logInterval); + } + } + catch (InterruptedException e) + { + throw new UncheckedInterruptedException(e); + } + } + } + - private StreamingChannel getOrCreateChannel() ++ private StreamingChannel getOrCreateFileChannel(InetAddressAndPort connectTo) + { + Thread currentThread = currentThread(); + try + { + StreamingChannel channel = threadToChannelMap.get(currentThread); + if (channel != null) + return channel; + - channel = createChannel(StreamingChannel.Kind.FILE); ++ channel = createFileChannel(connectTo); + threadToChannelMap.put(currentThread, channel); + return channel; + } + catch (Exception e) + { + throw new IOError(e); + } + } + + /** + * For testing purposes + */ + void injectChannel(StreamingChannel channel) + { + Thread currentThread = currentThread(); + if (threadToChannelMap.get(currentThread) != null) + throw new IllegalStateException("previous channel already set"); + + threadToChannelMap.put(currentThread, channel); + } + + /** + * For testing purposes + */ + void unsetChannel() + { + threadToChannelMap.remove(currentThread()); + } + } + + /** + * For testing purposes only. + */ + public void setClosed() + { + closed = true; + } + + void setControlChannel(NettyStreamingChannel channel) + { + controlChannel = channel; + } + + int semaphoreAvailablePermits() + { + return fileTransferSemaphore.permits(); + } + + public boolean connected() + { + return !closed && (controlChannel == null || controlChannel.connected()); + } + + public void close() + { + if (closed) + return; + + closed = true; + if (logger.isDebugEnabled()) + logger.debug("{} Closing stream connection channels on {}", createLogTag(session), to); + for (ScheduledFuture<?> future : channelKeepAlives) + future.cancel(false); + channelKeepAlives.clear(); + + threadToChannelMap.values().forEach(StreamingChannel::close); + threadToChannelMap.clear(); + fileTransferExecutor.shutdownNow(); + } +} diff --cc src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java index 354ed941c9,7db2aa68b3..eb34cef1ef --- a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java +++ b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java @@@ -43,12 -40,12 +43,29 @@@ public class BulkLoadConnectionFactory this.outboundBindAny = outboundBindAny; } - public Channel createConnection(OutboundConnectionSettings template, int messagingVersion) throws IOException ++ @Override + public NettyStreamingChannel create(InetSocketAddress to, int messagingVersion, StreamingChannel.Kind kind) throws IOException ++ { ++ OutboundConnectionSettings template = new OutboundConnectionSettings(getByAddress(to)); ++ return create(template, messagingVersion, kind); ++ } ++ ++ @Override ++ public StreamingChannel create(InetSocketAddress to, ++ InetSocketAddress preferred, ++ int messagingVersion, ++ StreamingChannel.Kind kind) throws IOException ++ { ++ // Supply a preferred address to the template, which will be overwritten if encryption is configured. ++ OutboundConnectionSettings template = new OutboundConnectionSettings(getByAddress(to), getByAddress(preferred)); ++ return create(template, messagingVersion, kind); ++ } ++ ++ private NettyStreamingChannel create(OutboundConnectionSettings template, int messagingVersion, StreamingChannel.Kind kind) throws IOException { // Connect to secure port for all peers if ServerEncryptionOptions is configured other than 'none' // When 'all', 'dc' and 'rack', server nodes always have SSL port open, and since thin client like sstableloader -- // does not know which node is in which dc/rack, connecting to SSL port is always the option. - OutboundConnectionSettings template = new OutboundConnectionSettings(getByAddress(to)); - ++ // does not know which node is in which dc/rack, connecting to SSL port is always the option. if (encryptionOptions != null && encryptionOptions.internode_encryption != EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.none) template = template.withConnectTo(template.to.withPort(secureStoragePort)).withEncryption(encryptionOptions); diff --cc test/distributed/org/apache/cassandra/distributed/test/RepairErrorsTest.java index c3c2b1427d,37c9171542..74a5c6f51d --- a/test/distributed/org/apache/cassandra/distributed/test/RepairErrorsTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/RepairErrorsTest.java @@@ -27,25 -27,24 +27,30 @@@ import net.bytebuddy.ByteBuddy import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; import net.bytebuddy.implementation.MethodDelegation; import net.bytebuddy.implementation.bind.annotation.SuperCall; +import org.assertj.core.api.Assertions; import org.junit.Test; -import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; + import org.apache.cassandra.db.ColumnFamilyStore; + import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.compaction.CompactionInterruptedException; +import org.apache.cassandra.db.compaction.CompactionIterator; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; + import org.apache.cassandra.db.streaming.CassandraIncomingFile; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; + import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.distributed.api.NodeToolResult; +import org.apache.cassandra.io.sstable.format.SSTableReader; + import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.locator.RangesAtEndpoint; import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.TimeUUID; import static net.bytebuddy.matcher.ElementMatchers.named; -import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@@ -54,34 -53,6 +59,33 @@@ import static org.apache.cassandra.dist public class RepairErrorsTest extends TestBaseImpl { + @Test + public void testRemoteValidationFailure() throws IOException + { + Cluster.Builder builder = Cluster.build(2) + .withConfig(config -> config.with(GOSSIP).with(NETWORK)) + .withInstanceInitializer(ByteBuddyHelper::install); + try (Cluster cluster = builder.createWithoutStarting()) + { + cluster.setUncaughtExceptionsFilter((i, throwable) -> { + if (i == 2) + return throwable.getMessage() != null && throwable.getMessage().contains("IGNORE"); + return false; + }); + + cluster.startup(); + init(cluster); + + cluster.schemaChange("create table "+KEYSPACE+".tbl (id int primary key, x int)"); + for (int i = 0; i < 10; i++) + cluster.coordinator(1).execute("insert into "+KEYSPACE+".tbl (id, x) VALUES (?,?)", ConsistencyLevel.ALL, i, i); + cluster.forEach(i -> i.flush(KEYSPACE)); + long mark = cluster.get(1).logs().mark(); + cluster.forEach(i -> i.nodetoolResult("repair", "--full").asserts().failure()); + Assertions.assertThat(cluster.get(1).logs().grep(mark, "^ERROR").getResult()).isEmpty(); + } + } + - @SuppressWarnings("Convert2MethodRef") @Test public void testRemoteSyncFailure() throws Exception { @@@ -124,62 -95,65 +128,111 @@@ result = cluster.get(1).nodetoolResult("repair", KEYSPACE); result.asserts().success(); - // Make sure we've cleaned up sessions and parent sessions: - Integer parents = cluster.get(1).callOnInstance(() -> ActiveRepairService.instance.parentRepairSessionCount()); - assertEquals(0, parents.intValue()); - Integer sessions = cluster.get(1).callOnInstance(() -> ActiveRepairService.instance.sessionCount()); - assertEquals(0, sessions.intValue()); + assertNoActiveRepairSessions(cluster.get(1)); + + cluster.forEach(i -> Assertions.assertThat(i.logs().grep("SomeRepairFailedException").getResult()) + .describedAs("node%d logged hidden exception org.apache.cassandra.repair.SomeRepairFailedException", i.config().num()) + .isEmpty()); } } + @Test + public void testRemoteStreamFailure() throws Exception + { + try (Cluster cluster = init(Cluster.build(3) + .withConfig(config -> config.with(GOSSIP, NETWORK) + .set("disk_failure_policy", "stop") + .set("disk_access_mode", "mmap_index_only")) + .withInstanceInitializer(ByteBuddyHelperStreamFailure::installStreamHandlingFailure).start())) + { + // Make sure we don't auto-compact the peers table. We'll need to try it manually later. + cluster.get(1).runOnInstance(() -> { + ColumnFamilyStore cfs = Keyspace.open("system").getColumnFamilyStore("peers_v2"); + cfs.disableAutoCompaction(); + }); + + cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, x int)"); + + // On repair, this data layout will require two (local) syncs from node 1 and one remote sync from node 2: + cluster.get(1).executeInternal("insert into " + KEYSPACE + ".tbl (id, x) VALUES (?,?)", 1, 1); + cluster.get(2).executeInternal("insert into " + KEYSPACE + ".tbl (id, x) VALUES (?,?)", 2, 2); + cluster.get(3).executeInternal("insert into " + KEYSPACE + ".tbl (id, x) VALUES (?,?)", 3, 3); + cluster.forEach(i -> i.flush(KEYSPACE)); + + // Flush system.peers_v2, or there won't be any SSTables... + cluster.forEach(i -> i.flush("system")); + + // Stream reading will fail on node 3, and this will interrupt node 1 just as it starts to stream to node 2. + NodeToolResult result = cluster.get(1).nodetoolResult("repair", KEYSPACE); + result.asserts().failure(); + + // Ensure that the peers table is compactable even after the file streaming task is interrupted. + cluster.get(1).runOnInstance(() -> { + ColumnFamilyStore cfs = Keyspace.open("system").getColumnFamilyStore("peers_v2"); + cfs.forceMajorCompaction(); + }); + + assertTrue(cluster.get(1).logs().grep("Stopping transports as disk_failure_policy is stop").getResult().isEmpty()); + assertTrue(cluster.get(1).logs().grep("FSReadError").getResult().isEmpty()); + + assertNoActiveRepairSessions(cluster.get(1)); + } + } + + @Test + public void testNoSuchRepairSessionAnticompaction() throws IOException + { + try (Cluster cluster = init(Cluster.build(2) + .withConfig(config -> config.with(GOSSIP).with(NETWORK)) + .withInstanceInitializer(ByteBuddyHelper::installACNoSuchRepairSession) + .start())) + { + cluster.schemaChange("create table "+KEYSPACE+".tbl (id int primary key, x int)"); + for (int i = 0; i < 10; i++) + cluster.coordinator(1).execute("insert into "+KEYSPACE+".tbl (id, x) VALUES (?,?)", ConsistencyLevel.ALL, i, i); + cluster.forEach(i -> i.flush(KEYSPACE)); + long mark = cluster.get(1).logs().mark(); + cluster.forEach(i -> i.nodetoolResult("repair", KEYSPACE).asserts().failure()); + assertTrue(cluster.get(1).logs().grep(mark, "^ERROR").getResult().isEmpty()); + } + } + + @SuppressWarnings("Convert2MethodRef") + private void assertNoActiveRepairSessions(IInvokableInstance instance) + { + // Make sure we've cleaned up sessions and parent sessions: + Integer parents = instance.callOnInstance(() -> ActiveRepairService.instance.parentRepairSessionCount()); + assertEquals(0, parents.intValue()); + Integer sessions = instance.callOnInstance(() -> ActiveRepairService.instance.sessionCount()); + assertEquals(0, sessions.intValue()); + } + public static class ByteBuddyHelper { + public static void install(ClassLoader cl, int nodeNumber) + { + if (nodeNumber == 2) + { + new ByteBuddy().redefine(CompactionIterator.class) + .method(named("next")) + .intercept(MethodDelegation.to(ByteBuddyHelper.class)) + .make() + .load(cl, ClassLoadingStrategy.Default.INJECTION); + } + } + + public static void installACNoSuchRepairSession(ClassLoader cl, int nodeNumber) + { + if (nodeNumber == 2) + { + new ByteBuddy().redefine(CompactionManager.class) + .method(named("validateSSTableBoundsForAnticompaction")) + .intercept(MethodDelegation.to(ByteBuddyHelper.class)) + .make() + .load(cl, ClassLoadingStrategy.Default.INJECTION); + } + } + public static void installStreamPlanExecutionFailure(ClassLoader cl, int nodeNumber) { if (nodeNumber == 2) @@@ -238,5 -205,64 +291,54 @@@ return zuper.call(); } - - @SuppressWarnings({"unused", "ResultOfMethodCallIgnored"}) - public static Throwable extractThrowable(Future<?> future, @SuperCall Callable<Throwable> zuper) throws Exception - { - if (Thread.currentThread().getName().contains("RepairJobTask")) - // Clear the interrupt flag so the FSReadError is propagated correctly in DebuggableThreadPoolExecutor: - Thread.interrupted(); - - return zuper.call(); - } } + + public static class ByteBuddyHelperStreamFailure + { + public static void installStreamHandlingFailure(ClassLoader cl, int nodeNumber) + { + if (nodeNumber == 3) + { + new ByteBuddy().rebase(CassandraIncomingFile.class) - .method(named("read")) - .intercept(MethodDelegation.to(ByteBuddyHelperStreamFailure.class)) - .make() - .load(cl, ClassLoadingStrategy.Default.INJECTION); ++ .method(named("read")) ++ .intercept(MethodDelegation.to(ByteBuddyHelperStreamFailure.class)) ++ .make() ++ .load(cl, ClassLoadingStrategy.Default.INJECTION); + } + + if (nodeNumber == 1) + { + new ByteBuddy().rebase(SystemKeyspace.class) - .method(named("getPreferredIP")) - .intercept(MethodDelegation.to(ByteBuddyHelperStreamFailure.class)) - .make() - .load(cl, ClassLoadingStrategy.Default.INJECTION); ++ .method(named("getPreferredIP")) ++ .intercept(MethodDelegation.to(ByteBuddyHelperStreamFailure.class)) ++ .make() ++ .load(cl, ClassLoadingStrategy.Default.INJECTION); + } + } + + @SuppressWarnings("unused") + public static void read(DataInputPlus in, int version) throws IOException + { + throw new IOException("Failing incoming file read from test!"); + } + + @SuppressWarnings("unused") + public static InetAddressAndPort getPreferredIP(InetAddressAndPort ep, @SuperCall Callable<InetAddressAndPort> zuper) throws Exception + { - if (Thread.currentThread().getName().contains("NettyStreaming-Outbound") && ep.address.toString().contains("127.0.0.2")) ++ if (Thread.currentThread().getName().contains("NettyStreaming-Outbound") && ep.getAddress().toString().contains("127.0.0.2")) + { + try + { + TimeUnit.SECONDS.sleep(10); + } + catch (InterruptedException e) + { + // Leave the interrupt flag intact for the ChannelProxy downstream... + Thread.currentThread().interrupt(); + } + } + + return zuper.call(); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org