This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 663e51d92868cbd045a83d7fa53e373bb28721a0
Merge: 133ad50a84 a00d8fd5ba
Author: Caleb Rackliffe <calebrackli...@gmail.com>
AuthorDate: Thu Jun 2 12:07:38 2022 -0500

    Merge branch 'cassandra-4.0' into cassandra-4.1

 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/repair/LocalSyncTask.java |  46 ++++-----
 .../apache/cassandra/streaming/StreamSession.java  |   6 ++
 .../cassandra/streaming/StreamingChannel.java      |   9 ++
 .../async/NettyStreamingConnectionFactory.java     |   9 ++
 .../async/StreamingMultiplexedChannel.java         |  60 ++++++-----
 .../cassandra/tools/BulkLoadConnectionFactory.java |  21 +++-
 .../distributed/test/RepairErrorsTest.java         | 114 +++++++++++++++++++--
 8 files changed, 208 insertions(+), 58 deletions(-)

diff --cc CHANGES.txt
index 07ccdf23a8,31c62caa57..cfb7bcb41a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,182 -1,7 +1,183 @@@
 -4.0.5
 +4.1-alpha2
 + * Remove expired snapshots of dropped tables after restart (CASSANDRA-17619) 
 +Merged from 4.0:
+  * Ensure FileStreamTask cannot compromise shared channel proxy for system 
table when interrupted (CASSANDRA-17663)
   * silence benign SslClosedEngineException (CASSANDRA-17565)
  Merged from 3.11:
 +Merged from 3.0:
 +
 +
 +4.1-alpha1
 + * Handle config parameters upper bound on startup; Fix auto_snapshot_ttl and 
paxos_purge_grace_period min unit validations (CASSANDRA-17571)
 + * Fix leak of non-standard Java types in our Exceptions as clients using JMX 
are unable to handle them.
 +   Remove useless validation that leads to unnecessary additional read of 
cassandra.yaml on startup (CASSANDRA-17638)
 + * Fix repair_request_timeout_in_ms and remove paxos_auto_repair_threshold_mb 
(CASSANDRA-17557)
 + * Incremental repair leaks SomeRepairFailedException after switch away from 
flatMap (CASSANDRA-17620)
 + * StorageService read threshold get methods throw NullPointerException due 
to not handling null configs (CASSANDRA-17593)
 + * Rename truncate_drop guardrail to drop_truncate_table (CASSANDRA-17592)
 + * nodetool enablefullquerylog can NPE when directory has no files 
(CASSANDRA-17595)
 + * Add auto_snapshot_ttl configuration (CASSANDRA-16790)
 + * List snapshots of dropped tables (CASSANDRA-16843)
 + * Add information whether sstables are dropped to SchemaChangeListener 
(CASSANDRA-17582)
 + * Add a pluggable memtable API (CEP-11 / CASSANDRA-17034)
 + * Save sstable id as string in activity table (CASSANDRA-17585)
 + * Implement startup check to prevent Cassandra to potentially spread zombie 
data (CASSANDRA-17180)
 + * Allow failing startup on duplicate config keys (CASSANDRA-17379)
 + * Migrate threshold for minimum keyspace replication factor to guardrails 
(CASSANDRA-17212)
 + * Add guardrail to disallow TRUNCATE and DROP TABLE commands 
(CASSANDRA-17558)
 + * Add plugin support for CQLSH (CASSANDRA-16456)
 + * Add guardrail to disallow querying with ALLOW FILTERING (CASSANDRA-17370)
 + * Enhance SnakeYAML properties to be reusable outside of YAML parsing, 
support camel case conversion to snake case, and add support to ignore 
properties (CASSANDRA-17166)
 + * nodetool compact should support using a key string to find the range to 
avoid operators having to manually do this (CASSANDRA-17537)
 + * Add guardrail for data disk usage (CASSANDRA-17150)
 + * Tool to list data paths of existing tables (CASSANDRA-17568)
 + * Migrate track_warnings to more standard naming conventions and use latest 
configuration types rather than long (CASSANDRA-17560)
 + * Add support for CONTAINS and CONTAINS KEY in conditional UPDATE and DELETE 
statement (CASSANDRA-10537)
 + * Migrate advanced config parameters to the new Config types 
(CASSANDRA-17431)
 + * Make null to be meaning disabled and leave 0 as a valid value for 
permissions_update_interval, roles_update_interval, credentials_update_interval 
(CASSANDRA-17431)
 + * Fix typo in Config annotation (CASSANDRA-17431)
 + * Made Converters type safe and fixed a few cases where converters used the 
wrong type (CASSANDRA-17431)
 + * Fix null bug in DataStorageSpec and DurationSpec and require units to be 
added when providing 0 value (CASSANDRA-17431)
 + * Shutdown ScheduledExecutors as part of node drainage (CASSANDRA-17493)
 + * Provide JMX endpoint to allow transient logging of blocking read repairs 
(CASSANDRA-17471)
 + * Add guardrail for GROUP BY queries (CASSANDRA-17509)
 + * make pylib PEP and pylint compliant (CASSANDRA-17546)
 + * Add support for vnodes in jvm-dtest (CASSANDRA-17332)
 + * Remove guardrails global enable flag (CASSANDRA-17499)
 + * Clients using JMX are unable to handle non-standard java types but we leak 
this into our interfaces (CASSANDRA-17527)
 + * Remove stress server functionality (CASSANDRA-17535)
 + * Reduce histogram snapshot long[] allocation overhead during speculative 
read and write threshold updates (CASSANDRA-17523)
 + * Add guardrail for creation of secondary indexes (CASSANDRA-17498)
 + * Add guardrail to disallow creation of uncompressed tables (CASSANDRA-17504)
 + * Add guardrail to disallow creation of new COMPACT STORAGE tables 
(CASSANDRA-17522)
 + * repair vtables should expose a completed field due to lack of filtering 
options in CQL (CASSANDRA-17520)
 + * remove outdated code from cqlsh (CASSANDRA-17490)
 + * remove support for deprecated version specific TLS in Python 3.6 
(CASSANDRA-17365)
 + * Add support for IF EXISTS and IF NOT EXISTS in ALTER statements 
(CASSANDRA-16916)
 + * resolve several pylint issues in cqlsh.py and pylib (CASSANDRA-17480)
 + * Streaming sessions longer than 3 minutes fail with timeout 
(CASSANDRA-17510)
 + * Add ability to track state in repair (CASSANDRA-15399)
 + * Remove unused 'parse' module (CASSANDRA-17484)
 + * change six functions in cqlshlib to native Python 3 (CASSANDRA-17417)
 + * reduce hot-path object allocations required to record local/remote 
requests against the client request metrics (CASSANDRA-17424)
 + * Disallow removing DC from system_auth while nodes are active in the DC 
(CASSANDRA-17478)
 + * Add guardrail for the number of fields per UDT (CASSANDRA-17385)
 + * Allow users to change cqlsh history location using env variable 
(CASSANDRA-17448)
 + * Add required -f option to use nodetool verify and standalone sstableverify 
(CASSANDRA-17017)
 + * Add support for UUID based sstable generation identifiers (CASSANDRA-17048)
 + * Log largest memtable flush at info instead of debug (CASSANDRA-17472)
 + * Add native transport rate limiter options to example cassandra.yaml, and 
expose metric for dispatch rate (CASSANDRA-17423)
 + * Add diagnostic events for guardrails (CASSANDRA-17197)
 + * Pre hashed passwords in CQL (CASSANDRA-17334)
 + * Increase cqlsh version (CASSANDRA-17432)
 + * Update SUPPORTED_UPGRADE_PATHS to include 3.0 and 3.x to 4.1 paths and 
remove obsolete tests (CASSANDRA-17362)
 + * Support DELETE in CQLSSTableWriter (CASSANDRA-14797)
 + * Failed inbound internode authentication failures generate ugly warning 
with stack trace (CASSANDRA-17068)
 + * Expose gossip information in system_views.gossip_info virtual table 
(CASSANDRA-17002)
 + * Add guardrails for collection items and size (CASSANDRA-17153)
 + * Improve guardrails messages (CASSANDRA-17430)
 + * Remove all usages of junit.framework and ban them via Checkstyle 
(CASSANDRA-17316)
 + * Add guardrails for read/write consistency levels (CASSANDRA-17188)
 + * Add guardrail for SELECT IN terms and their cartesian product 
(CASSANDRA-17187)
 + * remove unused imports in cqlsh.py and cqlshlib (CASSANDRA-17413)
 + * deprecate property windows_timer_interval (CASSANDRA-17404)
 + * Expose streaming as a vtable (CASSANDRA-17390)
 + * Expose all client options via system_views.clients and nodetool 
clientstats (CASSANDRA-16378)
 + * Make startup checks configurable (CASSANDRA-17220)
 + * Add guardrail for number of partition keys on IN queries (CASSANDRA-17186)
 + * update Python test framework from nose to pytest (CASSANDRA-17293)
 + * Fix improper CDC commit log segments deletion in non-blocking mode 
(CASSANDRA-17233)
 + * Add support for string concatenations through the + operator 
(CASSANDRA-17190)
 + * Limit the maximum hints size per host (CASSANDRA-17142)
 + * Add a virtual table for exposing batch metrics (CASSANDRA-17225)
 + * Flatten guardrails config (CASSANDRA-17353)
 + * Instance failed to start up due to NPE in 
StartupClusterConnectivityChecker (CASSANDRA-17347)
 + * add the shorter version of version flag (-v) in cqlsh (CASSANDRA-17236)
 + * Make vtables accessible via internode messaging (CASSANDRA-17295)
 + * Add support for PEM based key material for SSL (CASSANDRA-17031)
 + * Standardize storage configuration parameters' names. Support unit 
suffixes. (CASSANDRA-15234)
 + * Remove support for Windows (CASSANDRA-16956)
 + * Runtime-configurable YAML option to prohibit USE statements 
(CASSANDRA-17318)
 + * When streaming sees a ClosedChannelException this triggers the disk 
failure policy (CASSANDRA-17116)
 + * Add a virtual table for exposing prepared statements metrics 
(CASSANDRA-17224)
 + * Remove python 2.x support from cqlsh (CASSANDRA-17242)
 + * Prewarm role and credential caches to avoid timeouts at startup 
(CASSANDRA-16958)
 + * Make capacity/validity/updateinterval/activeupdate for Auth Caches 
configurable via nodetool (CASSANDRA-17063)
 + * Added startup check for read_ahead_kb setting (CASSANDRA-16436)
 + * Avoid unecessary array allocations and initializations when performing 
query checks (CASSANDRA-17209)
 + * Add guardrail for list operations that require read before write 
(CASSANDRA-17154)
 + * Migrate thresholds for number of keyspaces and tables to guardrails 
(CASSANDRA-17195)
 + * Remove self-reference in SSTableTidier (CASSANDRA-17205)
 + * Add guardrail for query page size (CASSANDRA-17189)
 + * Allow column_index_size_in_kb to be configurable through nodetool 
(CASSANDRA-17121)
 + * Emit a metric for number of local read and write calls
 + * Add non-blocking mode for CDC writes (CASSANDRA-17001)
 + * Add guardrails framework (CASSANDRA-17147)
 + * Harden resource management on SSTable components to prevent future leaks 
(CASSANDRA-17174)
 + * Make nodes more resilient to local unrelated files during startup 
(CASSANDRA-17082)
 + * repair prepare message would produce a wrong error message if network 
timeout happened rather than reply wait timeout (CASSANDRA-16992)
 + * Log queries that fail on timeout or unavailable errors up to once per 
minute by default (CASSANDRA-17159)
 + * Refactor normal/preview/IR repair to standardize repair cleanup and error 
handling of failed RepairJobs (CASSANDRA-17069)
 + * Log missing peers in StartupClusterConnectivityChecker (CASSANDRA-17130)
 + * Introduce separate rate limiting settings for entire SSTable streaming 
(CASSANDRA-17065)
 + * Implement Virtual Tables for Auth Caches (CASSANDRA-16914)
 + * Actively update auth cache in the background (CASSANDRA-16957)
 + * Add unix time conversion functions (CASSANDRA-17029)
 + * JVMStabilityInspector.forceHeapSpaceOomMaybe should handle all non-heap 
OOMs rather than only supporting direct only (CASSANDRA-17128)
 + * Forbid other Future implementations with checkstyle (CASSANDRA-17055)
 + * commit log was switched from non-daemon to daemon threads, which causes 
the JVM to exit in some case as no non-daemon threads are active 
(CASSANDRA-17085)
 + * Add a Denylist to block reads and writes on specific partition keys 
(CASSANDRA-12106)
 + * v4+ protocol did not clean up client warnings, which caused leaking the 
state (CASSANDRA-17054)
 + * Remove duplicate toCQLString in ReadCommand (CASSANDRA-17023)
 + * Ensure hint window is persistent across restarts of a node 
(CASSANDRA-14309)
 + * Allow to GRANT or REVOKE multiple permissions in a single statement 
(CASSANDRA-17030)
 + * Allow to grant permission for all tables in a keyspace (CASSANDRA-17027)
 + * Log time spent writing keys during compaction (CASSANDRA-17037)
 + * Make nodetool compactionstats and sstable_tasks consistent 
(CASSANDRA-16976)
 + * Add metrics and logging around index summary redistribution 
(CASSANDRA-17036)
 + * Add configuration options for minimum allowable replication factor and 
default replication factor (CASSANDRA-14557)
 + * Expose information about stored hints via a nodetool command and a virtual 
table (CASSANDRA-14795)
 + * Add broadcast_rpc_address to system.local (CASSANDRA-11181)
 + * Add support for type casting in WHERE clause components and in the values 
of INSERT/UPDATE statements (CASSANDRA-14337)
 + * add credentials file support to CQLSH (CASSANDRA-16983)
 + * Skip remaining bytes in the Envelope buffer when a ProtocolException is 
thrown to avoid double decoding (CASSANDRA-17026)
 + * Allow reverse iteration of resources during permissions checking 
(CASSANDRA-17016)
 + * Add feature to verify correct ownership of attached locations on disk at 
startup (CASSANDRA-16879)
 + * Make SSLContext creation pluggable/extensible (CASSANDRA-16666)
 + * Add soft/hard limits to local reads to protect against reading too much 
data in a single query (CASSANDRA-16896)
 + * Avoid token cache invalidation for removing a non-member node 
(CASSANDRA-15290)
 + * Allow configuration of consistency levels on auth operations 
(CASSANDRA-12988)
 + * Add number of sstables in a compaction to compactionstats output 
(CASSANDRA-16844)
 + * Upgrade Caffeine to 2.9.2 (CASSANDRA-15153)
 + * Allow DELETE and TRUNCATE to work on Virtual Tables if the implementation 
allows it (CASSANDRA-16806)
 + * Include SASI components to snapshots (CASSANDRA-15134)
 + * Fix missed wait latencies in the output of `nodetool tpstats -F` 
(CASSANDRA-16938)
 + * Reduce native transport max frame size to 16MB (CASSANDRA-16886)
 + * Add support for filtering using IN restrictions (CASSANDRA-14344)
 + * Provide a nodetool command to invalidate auth caches (CASSANDRA-16404)
 + * Catch read repair timeout exceptions and add metric (CASSANDRA-16880)
 + * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies 
(CASSANDRA-16854)
 + * Add client warnings and abort to tombstone and coordinator reads which go 
past a low/high watermark (CASSANDRA-16850)
 + * Add TTL support to nodetool snapshots (CASSANDRA-16789)
 + * Allow CommitLogSegmentReader to optionally skip sync marker CRC checks 
(CASSANDRA-16842)
 + * allow blocking IPs from updating metrics about traffic (CASSANDRA-16859)
 + * Request-Based Native Transport Rate-Limiting (CASSANDRA-16663)
 + * Implement nodetool getauditlog command (CASSANDRA-16725)
 + * Clean up repair code (CASSANDRA-13720)
 + * Background schedule to clean up orphaned hints files (CASSANDRA-16815)
 + * Modify SecondaryIndexManager#indexPartition() to retrieve only columns for 
which indexes are actually being built (CASSANDRA-16776)
 + * Batch the token metadata update to improve the speed (CASSANDRA-15291)
 + * Reduce the log level on "expected" repair exceptions (CASSANDRA-16775)
 + * Make JMXTimer expose attributes using consistent time unit 
(CASSANDRA-16760)
 + * Remove check on gossip status from DynamicEndpointSnitch::updateScores 
(CASSANDRA-11671)
 + * Fix AbstractReadQuery::toCQLString not returning valid CQL 
(CASSANDRA-16510)
 + * Log when compacting many tombstones (CASSANDRA-16780)
 + * Display bytes per level in tablestats for LCS tables (CASSANDRA-16799)
 + * Add isolated flush timer to CommitLogMetrics and ensure writes correspond 
to single WaitingOnCommit data points (CASSANDRA-16701)
 + * GossiperTest.testHasVersion3Nodes didn't take into account trunk version 
changes, fixed to rely on latest version (CASSANDRA-16651)
 + * Update JNA library to 5.9.0 and snappy-java to version 1.1.8.4 
(CASSANDRA-17040)
 +Merged from 4.0:
 +Merged from 3.11:
  Merged from 3.0:
   * Fix issue where frozen maps may not be serialized in the correct order 
(CASSANDRA-17623)
   * Suppress CVE-2022-24823 (CASSANDRA-17633)
diff --cc src/java/org/apache/cassandra/repair/LocalSyncTask.java
index 71cec282ad,99315754bd..28a6accff7
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@@ -19,6 -19,9 +19,7 @@@ package org.apache.cassandra.repair
  
  import java.util.Collections;
  import java.util.List;
 -import java.util.UUID;
 -import java.util.concurrent.CompletableFuture;
+ import java.util.concurrent.atomic.AtomicBoolean;
  
  import com.google.common.annotations.VisibleForTesting;
  import com.google.common.base.Preconditions;
@@@ -40,7 -43,6 +41,9 @@@ import org.apache.cassandra.streaming.S
  import org.apache.cassandra.tracing.TraceState;
  import org.apache.cassandra.tracing.Tracing;
  import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.TimeUUID;
++import org.apache.cassandra.utils.concurrent.AsyncPromise;
++import org.apache.cassandra.utils.concurrent.Promise;
  
  /**
   * LocalSyncTask performs streaming between local(coordinator) node and 
remote replica.
@@@ -58,11 -60,11 +61,11 @@@ public class LocalSyncTask extends Sync
      @VisibleForTesting
      public final boolean transferRanges;
  
-     private boolean active = true;
-     private StreamPlan streamPlan;
+     private final AtomicBoolean active = new AtomicBoolean(true);
 -    private final CompletableFuture<StreamPlan> planFuture = new 
CompletableFuture<>();
++    private final Promise<StreamPlan> planPromise = new AsyncPromise<>();
  
      public LocalSyncTask(RepairJobDesc desc, InetAddressAndPort local, 
InetAddressAndPort remote,
 -                         List<Range<Token>> diff, UUID pendingRepair,
 +                         List<Range<Token>> diff, TimeUUID pendingRepair,
                           boolean requestRanges, boolean transferRanges, 
PreviewKind previewKind)
      {
          super(desc, local, remote, diff, previewKind);
@@@ -115,8 -117,9 +118,9 @@@
              logger.info("{} {}", previewKind.logPrefix(desc.sessionId), 
message);
              Tracing.traceRepair(message);
  
-             streamPlan = createStreamPlan();
-             streamPlan.execute();
+             StreamPlan plan = createStreamPlan();
+             plan.execute();
 -            planFuture.complete(plan);
++            planPromise.setSuccess(plan);
          }
      }
  
@@@ -169,12 -171,11 +172,11 @@@
      }
  
      @Override
-     public synchronized void onFailure(Throwable t)
+     public void onFailure(Throwable t)
      {
-         if (active)
+         if (active.compareAndSet(true, false))
          {
-             active = false;
 -            setException(t);
 +            tryFailure(t);
              finished();
          }
      }
@@@ -191,22 -192,12 +193,12 @@@
      }
  
      @Override
-     public synchronized void abort()
+     public void abort()
      {
-         if (active)
 -        planFuture.whenComplete((plan, cause) ->
++        planPromise.addCallback((plan, cause) ->
          {
-             if (streamPlan == null)
-             {
-                 active = false;
-                 String message = String.format("Sync for session %s between 
%s and %s on %s aborted before starting",
-                                                desc.sessionId, 
nodePair.coordinator, nodePair.peer, desc.columnFamily);
-                 logger.debug("{} {}", previewKind.logPrefix(desc.sessionId), 
message);
-                 trySuccess(stat);
-             }
-             else
-             {
-                 
streamPlan.getCoordinator().getAllStreamSessions().forEach(StreamSession::abort);
-             }
-         }
+             assert plan != null : "StreamPlan future should never be 
completed exceptionally";
+             
plan.getCoordinator().getAllStreamSessions().forEach(StreamSession::abort);
+         });
      }
  }
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index 595890def8,b2739b331a..2036262e54
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -1084,42 -1037,18 +1084,48 @@@ public class StreamSession implements I
          public void onClose(InetAddressAndPort from);
      }
  
 +    public static String createLogTag(StreamSession session)
 +    {
 +        return createLogTag(session, (Object) null);
 +    }
 +
 +    public static String createLogTag(StreamSession session, StreamingChannel 
channel)
 +    {
 +        return createLogTag(session, channel == null ? null : channel.id());
 +    }
 +
 +    public static String createLogTag(StreamSession session, Channel channel)
 +    {
 +        return createLogTag(session, channel == null ? null : channel.id());
 +    }
 +
 +    public static String createLogTag(StreamSession session, Object channelId)
 +    {
 +        StringBuilder sb = new StringBuilder(64);
 +        sb.append("[Stream");
 +
 +        if (session != null)
 +            sb.append(" #").append(session.planId());
 +
 +        if (channelId != null)
 +            sb.append(" channel: ").append(channelId);
 +
 +        sb.append(']');
 +        return sb.toString();
 +    }
 +
      public synchronized void abort()
      {
+         if (state.isFinalState())
+         {
+             logger.debug("[Stream #{}] Stream session with peer {} is already 
in a final state on abort.", planId(), peer);
+             return;
+         }
+ 
          logger.info("[Stream #{}] Aborting stream session with peer {}...", 
planId(), peer);
  
 -        if (getMessageSender().connected())
 -            getMessageSender().sendMessage(new SessionFailedMessage());
 +        if (channel.connected())
 +            channel.sendControlMessage(new SessionFailedMessage());
  
          try
          {
diff --cc src/java/org/apache/cassandra/streaming/StreamingChannel.java
index f49089c48a,0000000000..a638638bc2
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/streaming/StreamingChannel.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingChannel.java
@@@ -1,78 -1,0 +1,87 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.streaming;
 +
 +import java.io.IOException;
 +import java.net.InetSocketAddress;
 +import java.util.function.IntFunction;
 +
 +import io.netty.util.concurrent.Future; //checkstyle: permit this import
 +import org.apache.cassandra.streaming.async.NettyStreamingConnectionFactory;
 +import org.apache.cassandra.utils.Shared;
 +
 +import static org.apache.cassandra.utils.Shared.Recursive.INTERFACES;
 +import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 +
 +@Shared(scope = SIMULATION, inner = INTERFACES)
 +public interface StreamingChannel
 +{
 +    public interface Factory
 +    {
 +        public static class Global
 +        {
 +            private static StreamingChannel.Factory FACTORY = new 
NettyStreamingConnectionFactory();
 +            public static StreamingChannel.Factory streamingFactory()
 +            {
 +                return FACTORY;
 +            }
 +
 +            public static void unsafeSet(StreamingChannel.Factory factory)
 +            {
 +                FACTORY = factory;
 +            }
 +        }
 +
 +        StreamingChannel create(InetSocketAddress to, int messagingVersion, 
Kind kind) throws IOException;
++
++        default StreamingChannel create(InetSocketAddress to,
++                                        InetSocketAddress preferred,
++                                        int messagingVersion,
++                                        StreamingChannel.Kind kind) throws 
IOException
++        {
++            // Implementations can decide whether or not to do something with 
the preferred address.
++            return create(to, messagingVersion, kind);
++        }
 +    }
 +
 +    public enum Kind { CONTROL, FILE }
 +
 +    public interface Send
 +    {
 +        void send(IntFunction<StreamingDataOutputPlus> outSupplier) throws 
IOException;
 +    }
 +
 +    Object id();
 +    String description();
 +
 +    InetSocketAddress peer();
 +    InetSocketAddress connectedTo();
 +    boolean connected();
 +
 +    StreamingDataInputPlus in();
 +
 +    /**
 +     * until closed, cannot invoke {@link #send(Send)}
 +     */
 +    StreamingDataOutputPlus acquireOut();
 +    Future<?> send(Send send) throws IOException;
 +
 +    Future<?> close();
 +    void onClose(Runnable runOnClose);
 +}
diff --cc 
src/java/org/apache/cassandra/streaming/async/NettyStreamingConnectionFactory.java
index 946df59892,0000000000..6a57e395e4
mode 100644,000000..100644
--- 
a/src/java/org/apache/cassandra/streaming/async/NettyStreamingConnectionFactory.java
+++ 
b/src/java/org/apache/cassandra/streaming/async/NettyStreamingConnectionFactory.java
@@@ -1,76 -1,0 +1,85 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.streaming.async;
 +
 +import java.io.IOException;
 +import java.net.InetSocketAddress;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +
 +import io.netty.channel.Channel;
 +import io.netty.channel.ChannelPipeline;
 +import io.netty.channel.EventLoop;
 +import io.netty.util.concurrent.Future; // checkstyle: permit this import
 +import org.apache.cassandra.net.ConnectionCategory;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.net.OutboundConnectionInitiator.Result;
 +import 
org.apache.cassandra.net.OutboundConnectionInitiator.Result.StreamingSuccess;
 +import org.apache.cassandra.net.OutboundConnectionSettings;
 +import org.apache.cassandra.streaming.StreamingChannel;
 +
 +import static org.apache.cassandra.locator.InetAddressAndPort.getByAddress;
 +import static 
org.apache.cassandra.net.OutboundConnectionInitiator.initiateStreaming;
 +
 +public class NettyStreamingConnectionFactory implements 
StreamingChannel.Factory
 +{
 +    @VisibleForTesting
 +    public static int MAX_CONNECT_ATTEMPTS = 3;
 +
 +    public static NettyStreamingChannel connect(OutboundConnectionSettings 
template, int messagingVersion, StreamingChannel.Kind kind) throws IOException
 +    {
 +        EventLoop eventLoop = 
MessagingService.instance().socketFactory.outboundStreamingGroup().next();
 +
 +        int attempts = 0;
 +        while (true)
 +        {
 +            Future<Result<StreamingSuccess>> result = 
initiateStreaming(eventLoop, 
template.withDefaults(ConnectionCategory.STREAMING), messagingVersion);
 +            result.awaitUninterruptibly(); // initiate has its own timeout, 
so this is "guaranteed" to return relatively promptly
 +            if (result.isSuccess())
 +            {
 +                Channel channel = result.getNow().success().channel;
 +                NettyStreamingChannel streamingChannel = new 
NettyStreamingChannel(messagingVersion, channel, kind);
 +                if (kind == StreamingChannel.Kind.CONTROL)
 +                {
 +                    ChannelPipeline pipeline = channel.pipeline();
 +                    pipeline.addLast("stream", streamingChannel);
 +                }
 +                return streamingChannel;
 +            }
 +
 +            if (++attempts == MAX_CONNECT_ATTEMPTS)
 +                throw new IOException("failed to connect to " + template.to + 
" for streaming data", result.cause());
 +        }
 +    }
 +
 +    @Override
 +    public StreamingChannel create(InetSocketAddress to, int 
messagingVersion, StreamingChannel.Kind kind) throws IOException
 +    {
 +        return connect(new OutboundConnectionSettings(getByAddress(to)), 
messagingVersion, kind);
 +    }
++
++    @Override
++    public StreamingChannel create(InetSocketAddress to,
++                                   InetSocketAddress preferred,
++                                   int messagingVersion,
++                                   StreamingChannel.Kind kind) throws 
IOException
++    {
++        return connect(new OutboundConnectionSettings(getByAddress(to), 
getByAddress(preferred)), messagingVersion, kind);
++    }
 +}
diff --cc 
src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
index 4f3a443515,0000000000..c2e551edb6
mode 100644,000000..100644
--- 
a/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
+++ 
b/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
@@@ -1,435 -1,0 +1,449 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.streaming.async;
 +
 +import java.io.IOError;
 +import java.io.IOException;
 +import java.net.InetSocketAddress;
 +import java.nio.channels.ClosedByInterruptException;
 +import java.util.Collection;
 +import java.util.concurrent.ConcurrentMap;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ScheduledFuture;
 +
 +import javax.annotation.Nullable;
 +
- import org.apache.cassandra.locator.InetAddressAndPort;
- import org.apache.cassandra.streaming.StreamDeserializingTask;
- import org.apache.cassandra.streaming.StreamingChannel;
- import org.apache.cassandra.streaming.StreamingDataOutputPlus;
- import org.apache.cassandra.utils.concurrent.ImmediateFuture;
- import org.apache.cassandra.utils.concurrent.Semaphore;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import io.netty.channel.Channel;
 +import io.netty.channel.ChannelFuture;
 +import io.netty.util.concurrent.Future; // checkstyle: permit this import
 +import org.apache.cassandra.concurrent.ExecutorPlus;
++import org.apache.cassandra.db.SystemKeyspace;
++import org.apache.cassandra.locator.InetAddressAndPort;
++import org.apache.cassandra.streaming.StreamDeserializingTask;
++import org.apache.cassandra.streaming.StreamingChannel;
++import org.apache.cassandra.streaming.StreamingDataOutputPlus;
 +import org.apache.cassandra.streaming.StreamSession;
 +import org.apache.cassandra.streaming.messages.IncomingStreamMessage;
 +import org.apache.cassandra.streaming.messages.OutgoingStreamMessage;
 +import org.apache.cassandra.streaming.messages.StreamMessage;
++import org.apache.cassandra.utils.concurrent.ImmediateFuture;
++import org.apache.cassandra.utils.concurrent.Semaphore;
 +import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 +
 +import static com.google.common.base.Throwables.getRootCause;
 +import static java.lang.Integer.parseInt;
 +import static java.lang.String.format;
 +import static java.lang.System.getProperty;
 +import static java.lang.Thread.currentThread;
 +import static java.util.concurrent.TimeUnit.*;
++
 +import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 +import static org.apache.cassandra.config.Config.PROPERTY_PREFIX;
 +import static org.apache.cassandra.streaming.StreamSession.createLogTag;
 +import static org.apache.cassandra.streaming.messages.StreamMessage.serialize;
 +import static 
org.apache.cassandra.streaming.messages.StreamMessage.serializedSize;
 +import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 +import static org.apache.cassandra.utils.FBUtilities.getAvailableProcessors;
 +import static 
org.apache.cassandra.utils.JVMStabilityInspector.inspectThrowable;
 +import static 
org.apache.cassandra.utils.concurrent.BlockingQueues.newBlockingQueue;
 +import static 
org.apache.cassandra.utils.concurrent.Semaphore.newFairSemaphore;
 +
- import static org.apache.cassandra.utils.Clock.Global.nanoTime;
- 
 +/**
 + * Responsible for sending {@link StreamMessage}s to a given peer. We manage 
an array of netty {@link Channel}s
 + * for sending {@link OutgoingStreamMessage} instances; all other {@link 
StreamMessage} types are sent via
 + * a special control channel. The reason for this is to treat those messages 
carefully and not let them get stuck
 + * behind a stream transfer.
 + *
 + * One of the challenges when sending streams is we might need to delay 
shipping the stream if:
 + *
 + * - we've exceeded our network I/O use due to rate limiting (at the 
cassandra level)
 + * - the receiver isn't keeping up, which causes the local TCP socket buffer 
to not empty, which causes epoll writes to not
 + * move any bytes to the socket, which causes buffers to stick around in 
user-land (a/k/a cassandra) memory.
 + *
 + * When those conditions occur, it's easy enough to reschedule processing the 
stream once the resources pick up
 + * (we acquire the permits from the rate limiter, or the socket drains). 
However, we need to ensure that
 + * no other messages are submitted to the same channel while the current 
stream is still being processed.
 + */
 +public class StreamingMultiplexedChannel
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(StreamingMultiplexedChannel.class);
 +
 +    private static final int DEFAULT_MAX_PARALLEL_TRANSFERS = 
getAvailableProcessors();
 +    private static final int MAX_PARALLEL_TRANSFERS = 
parseInt(getProperty(PROPERTY_PREFIX + "streaming.session.parallelTransfers", 
Integer.toString(DEFAULT_MAX_PARALLEL_TRANSFERS)));
 +
 +    // a simple mechansim for allowing a degree of fairness across multiple 
sessions
 +    private static final Semaphore fileTransferSemaphore = 
newFairSemaphore(DEFAULT_MAX_PARALLEL_TRANSFERS);
 +
 +    private final StreamingChannel.Factory factory;
 +    private final InetAddressAndPort to;
 +    private final StreamSession session;
 +    private final int messagingVersion;
 +
 +    private volatile boolean closed;
 +
 +    /**
 +     * A special {@link Channel} for sending non-stream streaming messages, 
basically anything that isn't an
 +     * {@link OutgoingStreamMessage} (or an {@link IncomingStreamMessage}, 
but a node doesn't send that, it's only received).
 +     */
 +    private volatile StreamingChannel controlChannel;
 +
 +    // note: this really doesn't need to be a LBQ, just something that's 
thread safe
 +    private final Collection<ScheduledFuture<?>> channelKeepAlives = 
newBlockingQueue();
 +
 +    private final ExecutorPlus fileTransferExecutor;
 +
 +    /**
 +     * A mapping of each {@link #fileTransferExecutor} thread to a channel 
that can be written to (on that thread).
 +     */
 +    private final ConcurrentMap<Thread, StreamingChannel> threadToChannelMap 
= new ConcurrentHashMap<>();
 +
 +    public StreamingMultiplexedChannel(StreamSession session, 
StreamingChannel.Factory factory, InetAddressAndPort to, @Nullable 
StreamingChannel controlChannel, int messagingVersion)
 +    {
 +        this.session = session;
 +        this.factory = factory;
 +        this.to = to;
 +        this.messagingVersion = messagingVersion;
 +        this.controlChannel = controlChannel;
 +
 +        String name = session.peer.toString().replace(':', '.');
 +        fileTransferExecutor = executorFactory()
 +                .configurePooled("NettyStreaming-Outbound-" + name, 
MAX_PARALLEL_TRANSFERS)
 +                .withKeepAlive(1L, SECONDS).build();
 +    }
 +
 +
 +
 +    public InetAddressAndPort peer()
 +    {
 +        return to;
 +    }
 +
 +    public InetSocketAddress connectedTo()
 +    {
 +        return controlChannel == null ? to : controlChannel.connectedTo();
 +    }
 +
 +    /**
 +     * Used by initiator to setup control message channel connecting to 
follower
 +     */
 +    private void setupControlMessageChannel() throws IOException
 +    {
 +        if (controlChannel == null)
 +        {
 +            /*
 +             * Inbound handlers are needed:
 +             *  a) for initiator's control channel(the first outbound 
channel) to receive follower's message.
 +             *  b) for streaming receiver (note: both initiator and follower 
can receive streaming files) to reveive files,
 +             *     in {@link Handler#setupStreamingPipeline}
 +             */
-             controlChannel = createChannel(StreamingChannel.Kind.CONTROL);
++            controlChannel = createControlChannel();
 +        }
 +    }
 +
-     private StreamingChannel createChannel(StreamingChannel.Kind kind) throws 
IOException
++    private StreamingChannel createControlChannel() throws IOException
 +    {
 +        logger.debug("Creating stream session to {} as {}", to, 
session.isFollower() ? "follower" : "initiator");
 +
-         StreamingChannel channel = factory.create(to, messagingVersion, kind);
-         if (kind == StreamingChannel.Kind.CONTROL)
-         {
-             
executorFactory().startThread(String.format("Stream-Deserializer-%s-%s", 
to.toString(), channel.id()),
-                                           new 
StreamDeserializingTask(session, channel, messagingVersion));
-             session.attachInbound(channel);
-         }
++        StreamingChannel channel = factory.create(to, messagingVersion, 
StreamingChannel.Kind.CONTROL);
++        
executorFactory().startThread(String.format("Stream-Deserializer-%s-%s", 
to.toString(), channel.id()),
++                                      new StreamDeserializingTask(session, 
channel, messagingVersion));
++        session.attachInbound(channel);
 +        session.attachOutbound(channel);
 +
-         logger.debug("Creating {}", channel.description());
++        logger.debug("Creating control {}", channel.description());
++        return channel;
++    }
++    
++    private StreamingChannel createFileChannel(InetAddressAndPort connectTo) 
throws IOException
++    {
++        logger.debug("Creating stream session to {} as {}", to, 
session.isFollower() ? "follower" : "initiator");
++
++        StreamingChannel channel = factory.create(to, connectTo, 
messagingVersion, StreamingChannel.Kind.FILE);
++        session.attachOutbound(channel);
++
++        logger.debug("Creating file {}", channel.description());
 +        return channel;
 +    }
 +
 +    public Future<?> sendControlMessage(StreamMessage message)
 +    {
 +        try
 +        {
 +            setupControlMessageChannel();
 +            return sendMessage(controlChannel, message);
 +        }
 +        catch (Exception e)
 +        {
 +            close();
 +            session.onError(e);
 +            return ImmediateFuture.failure(e);
 +        }
 +
 +    }
 +    public Future<?> sendMessage(StreamingChannel channel, StreamMessage 
message)
 +    {
 +        if (closed)
 +            throw new RuntimeException("stream has been closed, cannot send " 
+ message);
 +
 +        if (message instanceof OutgoingStreamMessage)
 +        {
 +            if (session.isPreview())
 +                throw new RuntimeException("Cannot send stream data messages 
for preview streaming sessions");
 +            if (logger.isDebugEnabled())
 +                logger.debug("{} Sending {}", createLogTag(session), message);
-             return fileTransferExecutor.submit(new 
FileStreamTask((OutgoingStreamMessage)message));
++
++            InetAddressAndPort connectTo = SystemKeyspace.getPreferredIP(to);
++            return fileTransferExecutor.submit(new 
FileStreamTask((OutgoingStreamMessage) message, connectTo));
 +        }
 +
 +        try
 +        {
 +            Future<?> promise = channel.send(outSupplier -> {
 +                // we anticipate that the control messages are rather small, 
so allocating a ByteBuf shouldn't  blow out of memory.
 +                long messageSize = serializedSize(message, messagingVersion);
 +                if (messageSize > 1 << 30)
 +                {
 +                    throw new IllegalStateException(format("%s something is 
seriously wrong with the calculated stream control message's size: %d bytes, 
type is %s",
 +                                                           
createLogTag(session, controlChannel.id()), messageSize, message.type));
 +                }
 +                try (StreamingDataOutputPlus out = outSupplier.apply((int) 
messageSize))
 +                {
 +                    StreamMessage.serialize(message, out, messagingVersion, 
session);
 +                }
 +            });
 +            promise.addListener(future -> onMessageComplete(future, message));
 +            return promise;
 +        }
 +        catch (Exception e)
 +        {
 +            close();
 +            session.onError(e);
 +            return ImmediateFuture.failure(e);
 +        }
 +    }
 +
 +    /**
 +     * Decides what to do after a {@link StreamMessage} is processed.
 +     *
 +     * Note: this is called from the netty event loop.
 +     *
 +     * @return null if the message was processed sucessfully; else, a {@link 
java.util.concurrent.Future} to indicate
 +     * the status of aborting any remaining tasks in the session.
 +     */
 +    Future<?> onMessageComplete(Future<?> future, StreamMessage msg)
 +    {
 +        Throwable cause = future.cause();
 +        if (cause == null)
 +            return null;
 +
 +        Channel channel = future instanceof ChannelFuture ? 
((ChannelFuture)future).channel() : null;
 +        logger.error("{} failed to send a stream message/data to peer {}: msg 
= {}",
 +                     createLogTag(session, channel), to, msg, future.cause());
 +
 +        // StreamSession will invoke close(), but we have to mark this sender 
as closed so the session doesn't try
 +        // to send any failure messages
 +        return session.onError(cause);
 +    }
 +
 +    class FileStreamTask implements Runnable
 +    {
 +        /**
 +         * Time interval, in minutes, to wait between logging a message 
indicating that we're waiting on a semaphore
 +         * permit to become available.
 +         */
 +        private static final int SEMAPHORE_UNAVAILABLE_LOG_INTERVAL = 3;
 +
 +        /**
 +         * Even though we expect only an {@link OutgoingStreamMessage} at 
runtime, the type here is {@link StreamMessage}
 +         * to facilitate simpler testing.
 +         */
 +        private final StreamMessage msg;
 +
-         FileStreamTask(OutgoingStreamMessage ofm)
++        private final InetAddressAndPort connectTo;
++
++        FileStreamTask(OutgoingStreamMessage ofm, InetAddressAndPort 
connectTo)
 +        {
 +            this.msg = ofm;
++            this.connectTo = connectTo;
 +        }
 +
 +        /**
 +         * For testing purposes
 +         */
 +        FileStreamTask(StreamMessage msg)
 +        {
 +            this.msg = msg;
++            this.connectTo = null;
 +        }
 +
 +        @Override
 +        public void run()
 +        {
 +            if (!acquirePermit(SEMAPHORE_UNAVAILABLE_LOG_INTERVAL))
 +                return;
 +
 +            StreamingChannel channel = null;
 +            try
 +            {
-                 channel = getOrCreateChannel();
++                channel = getOrCreateFileChannel(connectTo);
 +
 +                // close the DataOutputStreamPlus as we're done with it - but 
don't close the channel
 +                try (StreamingDataOutputPlus out = channel.acquireOut())
 +                {
 +                    serialize(msg, out, messagingVersion, session);
 +                }
 +            }
 +            catch (Exception e)
 +            {
 +                session.onError(e);
 +            }
 +            catch (Throwable t)
 +            {
 +                if (closed && getRootCause(t) instanceof 
ClosedByInterruptException && fileTransferExecutor.isShutdown())
 +                {
 +                    logger.debug("{} Streaming channel was closed due to the 
executor pool being shutdown", createLogTag(session, channel));
 +                }
 +                else
 +                {
 +                    inspectThrowable(t);
 +                    if (!session.state().isFinalState())
 +                        session.onError(t);
 +                }
 +            }
 +            finally
 +            {
 +                fileTransferSemaphore.release(1);
 +            }
 +        }
 +
 +        boolean acquirePermit(int logInterval)
 +        {
 +            long logIntervalNanos = MINUTES.toNanos(logInterval);
 +            long timeOfLastLogging = nanoTime();
 +            while (true)
 +            {
 +                if (closed)
 +                    return false;
 +                try
 +                {
 +                    if (fileTransferSemaphore.tryAcquire(1, 1, SECONDS))
 +                        return true;
 +
 +                    // log a helpful message to operators in case they are 
wondering why a given session might not be making progress.
 +                    long now = nanoTime();
 +                    if (now - timeOfLastLogging > logIntervalNanos)
 +                    {
 +                        timeOfLastLogging = now;
 +                        OutgoingStreamMessage ofm = 
(OutgoingStreamMessage)msg;
 +
 +                        if (logger.isInfoEnabled())
 +                            logger.info("{} waiting to acquire a permit to 
begin streaming {}. This message logs every {} minutes",
 +                                        createLogTag(session), ofm.getName(), 
logInterval);
 +                    }
 +                }
 +                catch (InterruptedException e)
 +                {
 +                    throw new UncheckedInterruptedException(e);
 +                }
 +            }
 +        }
 +
-         private StreamingChannel getOrCreateChannel()
++        private StreamingChannel getOrCreateFileChannel(InetAddressAndPort 
connectTo)
 +        {
 +            Thread currentThread = currentThread();
 +            try
 +            {
 +                StreamingChannel channel = 
threadToChannelMap.get(currentThread);
 +                if (channel != null)
 +                    return channel;
 +
-                 channel = createChannel(StreamingChannel.Kind.FILE);
++                channel = createFileChannel(connectTo);
 +                threadToChannelMap.put(currentThread, channel);
 +                return channel;
 +            }
 +            catch (Exception e)
 +            {
 +                throw new IOError(e);
 +            }
 +        }
 +
 +        /**
 +         * For testing purposes
 +         */
 +        void injectChannel(StreamingChannel channel)
 +        {
 +            Thread currentThread = currentThread();
 +            if (threadToChannelMap.get(currentThread) != null)
 +                throw new IllegalStateException("previous channel already 
set");
 +
 +            threadToChannelMap.put(currentThread, channel);
 +        }
 +
 +        /**
 +         * For testing purposes
 +         */
 +        void unsetChannel()
 +        {
 +            threadToChannelMap.remove(currentThread());
 +        }
 +    }
 +
 +    /**
 +     * For testing purposes only.
 +     */
 +    public void setClosed()
 +    {
 +        closed = true;
 +    }
 +
 +    void setControlChannel(NettyStreamingChannel channel)
 +    {
 +        controlChannel = channel;
 +    }
 +
 +    int semaphoreAvailablePermits()
 +    {
 +        return fileTransferSemaphore.permits();
 +    }
 +
 +    public boolean connected()
 +    {
 +        return !closed && (controlChannel == null || 
controlChannel.connected());
 +    }
 +
 +    public void close()
 +    {
 +        if (closed)
 +            return;
 +
 +        closed = true;
 +        if (logger.isDebugEnabled())
 +            logger.debug("{} Closing stream connection channels on {}", 
createLogTag(session), to);
 +        for (ScheduledFuture<?> future : channelKeepAlives)
 +            future.cancel(false);
 +        channelKeepAlives.clear();
 +
 +        threadToChannelMap.values().forEach(StreamingChannel::close);
 +        threadToChannelMap.clear();
 +        fileTransferExecutor.shutdownNow();
 +    }
 +}
diff --cc src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
index 354ed941c9,7db2aa68b3..eb34cef1ef
--- a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
@@@ -43,12 -40,12 +43,29 @@@ public class BulkLoadConnectionFactory 
          this.outboundBindAny = outboundBindAny;
      }
  
 -    public Channel createConnection(OutboundConnectionSettings template, int 
messagingVersion) throws IOException
++    @Override
 +    public NettyStreamingChannel create(InetSocketAddress to, int 
messagingVersion, StreamingChannel.Kind kind) throws IOException
++    {
++        OutboundConnectionSettings template = new 
OutboundConnectionSettings(getByAddress(to));
++        return create(template, messagingVersion, kind);
++    }
++
++    @Override
++    public StreamingChannel create(InetSocketAddress to,
++                                   InetSocketAddress preferred,
++                                   int messagingVersion,
++                                   StreamingChannel.Kind kind) throws 
IOException
++    {
++        // Supply a preferred address to the template, which will be 
overwritten if encryption is configured.
++        OutboundConnectionSettings template = new 
OutboundConnectionSettings(getByAddress(to), getByAddress(preferred));
++        return create(template, messagingVersion, kind);
++    }
++
++    private NettyStreamingChannel create(OutboundConnectionSettings template, 
int messagingVersion, StreamingChannel.Kind kind) throws IOException
      {
          // Connect to secure port for all peers if ServerEncryptionOptions is 
configured other than 'none'
          // When 'all', 'dc' and 'rack', server nodes always have SSL port 
open, and since thin client like sstableloader
--        // does not know which node is in which dc/rack, connecting to SSL 
port is always the option.
-         OutboundConnectionSettings template = new 
OutboundConnectionSettings(getByAddress(to));
 -
++        // does not know which node is in which dc/rack, connecting to SSL 
port is always the option. 
          if (encryptionOptions != null && 
encryptionOptions.internode_encryption != 
EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.none)
              template = 
template.withConnectTo(template.to.withPort(secureStoragePort)).withEncryption(encryptionOptions);
  
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/RepairErrorsTest.java
index c3c2b1427d,37c9171542..74a5c6f51d
--- 
a/test/distributed/org/apache/cassandra/distributed/test/RepairErrorsTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/RepairErrorsTest.java
@@@ -27,25 -27,24 +27,30 @@@ import net.bytebuddy.ByteBuddy
  import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
  import net.bytebuddy.implementation.MethodDelegation;
  import net.bytebuddy.implementation.bind.annotation.SuperCall;
 +import org.assertj.core.api.Assertions;
  import org.junit.Test;
  
 -import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.db.compaction.CompactionInterruptedException;
 +import org.apache.cassandra.db.compaction.CompactionIterator;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+ import org.apache.cassandra.db.streaming.CassandraIncomingFile;
  import org.apache.cassandra.distributed.Cluster;
  import org.apache.cassandra.distributed.api.ConsistencyLevel;
+ import org.apache.cassandra.distributed.api.IInvokableInstance;
  import org.apache.cassandra.distributed.api.NodeToolResult;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.io.util.DataInputPlus;
  import org.apache.cassandra.locator.InetAddressAndPort;
 -import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.locator.RangesAtEndpoint;
  import org.apache.cassandra.streaming.StreamSession;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.utils.TimeUUID;
  
  import static net.bytebuddy.matcher.ElementMatchers.named;
 -import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertTrue;
  
@@@ -54,34 -53,6 +59,33 @@@ import static org.apache.cassandra.dist
  
  public class RepairErrorsTest extends TestBaseImpl
  {
 +    @Test
 +    public void testRemoteValidationFailure() throws IOException
 +    {
 +        Cluster.Builder builder = Cluster.build(2)
 +                                         .withConfig(config -> 
config.with(GOSSIP).with(NETWORK))
 +                                         
.withInstanceInitializer(ByteBuddyHelper::install);
 +        try (Cluster cluster = builder.createWithoutStarting())
 +        {
 +            cluster.setUncaughtExceptionsFilter((i, throwable) -> {
 +                if (i == 2)
 +                    return throwable.getMessage() != null && 
throwable.getMessage().contains("IGNORE");
 +                return false;
 +            });
 +
 +            cluster.startup();
 +            init(cluster);
 +
 +            cluster.schemaChange("create table "+KEYSPACE+".tbl (id int 
primary key, x int)");
 +            for (int i = 0; i < 10; i++)
 +                cluster.coordinator(1).execute("insert into "+KEYSPACE+".tbl 
(id, x) VALUES (?,?)", ConsistencyLevel.ALL, i, i);
 +            cluster.forEach(i -> i.flush(KEYSPACE));
 +            long mark = cluster.get(1).logs().mark();
 +            cluster.forEach(i -> i.nodetoolResult("repair", 
"--full").asserts().failure());
 +            Assertions.assertThat(cluster.get(1).logs().grep(mark, 
"^ERROR").getResult()).isEmpty();
 +        }
 +    }
 +
-     @SuppressWarnings("Convert2MethodRef")
      @Test
      public void testRemoteSyncFailure() throws Exception
      {
@@@ -124,62 -95,65 +128,111 @@@
              result = cluster.get(1).nodetoolResult("repair", KEYSPACE);
              result.asserts().success();
  
-             // Make sure we've cleaned up sessions and parent sessions:
-             Integer parents = cluster.get(1).callOnInstance(() -> 
ActiveRepairService.instance.parentRepairSessionCount());
-             assertEquals(0, parents.intValue());
-             Integer sessions = cluster.get(1).callOnInstance(() -> 
ActiveRepairService.instance.sessionCount());
-             assertEquals(0, sessions.intValue());
+             assertNoActiveRepairSessions(cluster.get(1));
 +
 +            cluster.forEach(i -> 
Assertions.assertThat(i.logs().grep("SomeRepairFailedException").getResult())
 +                                           .describedAs("node%d logged hidden 
exception org.apache.cassandra.repair.SomeRepairFailedException", 
i.config().num())
 +                                           .isEmpty());
          }
      }
  
+     @Test
+     public void testRemoteStreamFailure() throws Exception
+     {
+         try (Cluster cluster = init(Cluster.build(3)
+                                            .withConfig(config -> 
config.with(GOSSIP, NETWORK)
+                                                                        
.set("disk_failure_policy", "stop")
+                                                                        
.set("disk_access_mode", "mmap_index_only"))
+                                            
.withInstanceInitializer(ByteBuddyHelperStreamFailure::installStreamHandlingFailure).start()))
+         {
+             // Make sure we don't auto-compact the peers table. We'll need to 
try it manually later.
+             cluster.get(1).runOnInstance(() -> {
+                 ColumnFamilyStore cfs = 
Keyspace.open("system").getColumnFamilyStore("peers_v2");
+                 cfs.disableAutoCompaction();
+             });
+ 
+             cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int 
primary key, x int)");
+ 
+             // On repair, this data layout will require two (local) syncs 
from node 1 and one remote sync from node 2:
+             cluster.get(1).executeInternal("insert into " + KEYSPACE + ".tbl 
(id, x) VALUES (?,?)", 1, 1);
+             cluster.get(2).executeInternal("insert into " + KEYSPACE + ".tbl 
(id, x) VALUES (?,?)", 2, 2);
+             cluster.get(3).executeInternal("insert into " + KEYSPACE + ".tbl 
(id, x) VALUES (?,?)", 3, 3);
+             cluster.forEach(i -> i.flush(KEYSPACE));
+ 
+             // Flush system.peers_v2, or there won't be any SSTables...
+             cluster.forEach(i -> i.flush("system"));
+ 
+             // Stream reading will fail on node 3, and this will interrupt 
node 1 just as it starts to stream to node 2.
+             NodeToolResult result = cluster.get(1).nodetoolResult("repair", 
KEYSPACE);
+             result.asserts().failure();
+ 
+             // Ensure that the peers table is compactable even after the file 
streaming task is interrupted.
+             cluster.get(1).runOnInstance(() -> {
+                 ColumnFamilyStore cfs = 
Keyspace.open("system").getColumnFamilyStore("peers_v2");
+                 cfs.forceMajorCompaction();
+             });
+ 
+             assertTrue(cluster.get(1).logs().grep("Stopping transports as 
disk_failure_policy is stop").getResult().isEmpty());
+             
assertTrue(cluster.get(1).logs().grep("FSReadError").getResult().isEmpty());
+ 
+             assertNoActiveRepairSessions(cluster.get(1));
+         }
+     }
+ 
 +    @Test
 +    public void testNoSuchRepairSessionAnticompaction() throws IOException
 +    {
 +        try (Cluster cluster = init(Cluster.build(2)
 +                                           .withConfig(config -> 
config.with(GOSSIP).with(NETWORK))
 +                                           
.withInstanceInitializer(ByteBuddyHelper::installACNoSuchRepairSession)
 +                                           .start()))
 +        {
 +            cluster.schemaChange("create table "+KEYSPACE+".tbl (id int 
primary key, x int)");
 +            for (int i = 0; i < 10; i++)
 +                cluster.coordinator(1).execute("insert into "+KEYSPACE+".tbl 
(id, x) VALUES (?,?)", ConsistencyLevel.ALL, i, i);
 +            cluster.forEach(i -> i.flush(KEYSPACE));
 +            long mark = cluster.get(1).logs().mark();
 +            cluster.forEach(i -> i.nodetoolResult("repair", 
KEYSPACE).asserts().failure());
 +            assertTrue(cluster.get(1).logs().grep(mark, 
"^ERROR").getResult().isEmpty());
 +        }
 +    }
 +
+     @SuppressWarnings("Convert2MethodRef")
+     private void assertNoActiveRepairSessions(IInvokableInstance instance)
+     {
+         // Make sure we've cleaned up sessions and parent sessions:
+         Integer parents = instance.callOnInstance(() -> 
ActiveRepairService.instance.parentRepairSessionCount());
+         assertEquals(0, parents.intValue());
+         Integer sessions = instance.callOnInstance(() -> 
ActiveRepairService.instance.sessionCount());
+         assertEquals(0, sessions.intValue());
+     }
+ 
      public static class ByteBuddyHelper
      {
 +        public static void install(ClassLoader cl, int nodeNumber)
 +        {
 +            if (nodeNumber == 2)
 +            {
 +                new ByteBuddy().redefine(CompactionIterator.class)
 +                               .method(named("next"))
 +                               
.intercept(MethodDelegation.to(ByteBuddyHelper.class))
 +                               .make()
 +                               .load(cl, 
ClassLoadingStrategy.Default.INJECTION);
 +            }
 +        }
 +
 +        public static void installACNoSuchRepairSession(ClassLoader cl, int 
nodeNumber)
 +        {
 +            if (nodeNumber == 2)
 +            {
 +                new ByteBuddy().redefine(CompactionManager.class)
 +                               
.method(named("validateSSTableBoundsForAnticompaction"))
 +                               
.intercept(MethodDelegation.to(ByteBuddyHelper.class))
 +                               .make()
 +                               .load(cl, 
ClassLoadingStrategy.Default.INJECTION);
 +            }
 +        }
 +        
          public static void installStreamPlanExecutionFailure(ClassLoader cl, 
int nodeNumber)
          {
              if (nodeNumber == 2)
@@@ -238,5 -205,64 +291,54 @@@
  
              return zuper.call();
          }
 -
 -        @SuppressWarnings({"unused", "ResultOfMethodCallIgnored"})
 -        public static Throwable extractThrowable(Future<?> future, @SuperCall 
Callable<Throwable> zuper) throws Exception
 -        {
 -            if (Thread.currentThread().getName().contains("RepairJobTask"))
 -                // Clear the interrupt flag so the FSReadError is propagated 
correctly in DebuggableThreadPoolExecutor:
 -                Thread.interrupted();
 -            
 -            return zuper.call();
 -        }
      }
+ 
+     public static class ByteBuddyHelperStreamFailure
+     {
+         public static void installStreamHandlingFailure(ClassLoader cl, int 
nodeNumber)
+         {
+             if (nodeNumber == 3)
+             {
+                 new ByteBuddy().rebase(CassandraIncomingFile.class)
 -                        .method(named("read"))
 -                        
.intercept(MethodDelegation.to(ByteBuddyHelperStreamFailure.class))
 -                        .make()
 -                        .load(cl, ClassLoadingStrategy.Default.INJECTION);
++                               .method(named("read"))
++                               
.intercept(MethodDelegation.to(ByteBuddyHelperStreamFailure.class))
++                               .make()
++                               .load(cl, 
ClassLoadingStrategy.Default.INJECTION);
+             }
+ 
+             if (nodeNumber == 1)
+             {
+                 new ByteBuddy().rebase(SystemKeyspace.class)
 -                        .method(named("getPreferredIP"))
 -                        
.intercept(MethodDelegation.to(ByteBuddyHelperStreamFailure.class))
 -                        .make()
 -                        .load(cl, ClassLoadingStrategy.Default.INJECTION);
++                               .method(named("getPreferredIP"))
++                               
.intercept(MethodDelegation.to(ByteBuddyHelperStreamFailure.class))
++                               .make()
++                               .load(cl, 
ClassLoadingStrategy.Default.INJECTION);
+             }
+         }
+ 
+         @SuppressWarnings("unused")
+         public static void read(DataInputPlus in, int version) throws 
IOException
+         {
+             throw new IOException("Failing incoming file read from test!");
+         }
+ 
+         @SuppressWarnings("unused")
+         public static InetAddressAndPort getPreferredIP(InetAddressAndPort 
ep, @SuperCall Callable<InetAddressAndPort> zuper) throws Exception
+         {
 -            if 
(Thread.currentThread().getName().contains("NettyStreaming-Outbound") && 
ep.address.toString().contains("127.0.0.2"))
++            if 
(Thread.currentThread().getName().contains("NettyStreaming-Outbound") && 
ep.getAddress().toString().contains("127.0.0.2"))
+             {
+                 try
+                 {
+                     TimeUnit.SECONDS.sleep(10);
+                 }
+                 catch (InterruptedException e)
+                 {
+                     // Leave the interrupt flag intact for the ChannelProxy 
downstream...
+                     Thread.currentThread().interrupt();
+                 }
+             }
+ 
+             return zuper.call();
+         }
+     }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to