This is an automated email from the ASF dual-hosted git repository.

blerer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9f6fcc3  Avoid race condition when completing stream sessions
9f6fcc3 is described below

commit 9f6fcc340d89eecc000765f6ab93e862f53a02d9
Author: Zhao Yang <zhaoyangsingap...@gmail.com>
AuthorDate: Fri Mar 20 15:56:53 2020 +0800

    Avoid race condition when completing stream sessions
    
    patch by ZhaoYang; reviewed by Sergio Bossa and Benjamin Lerer for
    CASSANDRA-15666
---
 .circleci/config.yml                               | 174 +++++-----
 CHANGES.txt                                        |   1 +
 .../streaming/CassandraCompressedStreamWriter.java |   2 +-
 .../db/streaming/CassandraStreamWriter.java        |   2 +-
 .../org/apache/cassandra/io/util/ChannelProxy.java |  10 +
 .../apache/cassandra/net/OutboundConnection.java   |   4 +-
 .../org/apache/cassandra/service/QueryState.java   |   2 +-
 .../cassandra/streaming/StreamCoordinator.java     |  15 +-
 .../apache/cassandra/streaming/StreamManager.java  |  28 +-
 .../org/apache/cassandra/streaming/StreamPlan.java |   4 +-
 .../cassandra/streaming/StreamResultFuture.java    |  31 +-
 .../apache/cassandra/streaming/StreamSession.java  | 352 +++++++++++++++------
 .../async/NettyStreamingMessageSender.java         |  60 +++-
 .../streaming/async/StreamingInboundHandler.java   |  17 +-
 .../streaming/messages/CompleteMessage.java        |   2 +-
 .../streaming/messages/IncomingStreamMessage.java  |   4 +-
 .../streaming/messages/KeepAliveMessage.java       |   2 +-
 .../streaming/messages/OutgoingStreamMessage.java  |   2 +-
 .../streaming/messages/PrepareAckMessage.java      |   2 +-
 .../streaming/messages/PrepareSynAckMessage.java   |   2 +-
 .../streaming/messages/PrepareSynMessage.java      |   2 +-
 .../streaming/messages/ReceivedMessage.java        |   2 +-
 .../streaming/messages/SessionFailedMessage.java   |   2 +-
 .../streaming/messages/StreamInitMessage.java      |   2 +-
 .../streaming/messages/StreamMessage.java          |   6 +-
 .../org/apache/cassandra/utils/FBUtilities.java    |  75 +++++
 .../cassandra/distributed/test/StreamingTest.java  | 107 +++++++
 .../microbench/ZeroCopyStreamingBenchmark.java     |   4 +-
 .../CassandraEntireSSTableStreamWriterTest.java    |   4 +-
 .../db/streaming/CassandraStreamManagerTest.java   |   1 +
 .../apache/cassandra/dht/StreamStateStoreTest.java |   4 +-
 ...ntireSSTableStreamingCorrectFilesCountTest.java |   9 +-
 .../streaming/StreamTransferTaskTest.java          |  10 +-
 .../async/NettyStreamingMessageSenderTest.java     |   6 +-
 .../async/StreamingInboundHandlerTest.java         |   6 +-
 35 files changed, 673 insertions(+), 283 deletions(-)

diff --git a/.circleci/config.yml b/.circleci/config.yml
index 7f6c93b..7231d67 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -3,7 +3,7 @@ jobs:
   j8_jvm_upgrade_dtests:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 1
@@ -85,8 +85,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -94,7 +94,7 @@ jobs:
   j8_cqlsh-dtests-py2-with-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 4
@@ -162,8 +162,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -171,7 +171,7 @@ jobs:
   j11_unit_tests:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 4
@@ -253,8 +253,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
@@ -263,7 +263,7 @@ jobs:
   j8_cqlsh-dtests-py38-no-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 4
@@ -331,8 +331,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -340,7 +340,7 @@ jobs:
   j11_cqlsh-dtests-py3-with-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 4
@@ -408,8 +408,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
@@ -418,7 +418,7 @@ jobs:
   j11_cqlsh-dtests-py3-no-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 4
@@ -486,8 +486,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
@@ -496,7 +496,7 @@ jobs:
   j11_cqlsh-dtests-py38-with-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 4
@@ -564,8 +564,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
@@ -574,7 +574,7 @@ jobs:
   j8_cqlsh-dtests-py3-with-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 4
@@ -642,8 +642,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -651,7 +651,7 @@ jobs:
   j8_cqlsh-dtests-py2-no-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 4
@@ -719,8 +719,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -728,7 +728,7 @@ jobs:
   j11_cqlsh-dtests-py2-with-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 4
@@ -796,8 +796,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
@@ -806,7 +806,7 @@ jobs:
   j11_dtests-with-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 4
@@ -877,8 +877,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
@@ -887,7 +887,7 @@ jobs:
   j8_dtests-no-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 4
@@ -936,8 +936,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -945,7 +945,7 @@ jobs:
   j8_upgradetests-no-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 4
@@ -1035,8 +1035,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -1044,7 +1044,7 @@ jobs:
   utests_stress:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 1
@@ -1080,8 +1080,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -1089,7 +1089,7 @@ jobs:
   j8_unit_tests:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 4
@@ -1171,8 +1171,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -1180,7 +1180,7 @@ jobs:
   j11_jvm_dtests:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 1
@@ -1262,8 +1262,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
@@ -1272,7 +1272,7 @@ jobs:
   j11_build:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 1
@@ -1343,8 +1343,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
@@ -1353,7 +1353,7 @@ jobs:
   j11_cqlsh-dtests-py2-no-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 4
@@ -1421,8 +1421,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
@@ -1431,7 +1431,7 @@ jobs:
   j8_dtests-with-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 4
@@ -1480,8 +1480,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -1489,7 +1489,7 @@ jobs:
   j11_cqlsh-dtests-py38-no-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 4
@@ -1557,8 +1557,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
@@ -1567,7 +1567,7 @@ jobs:
   j8_jvm_dtests:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 1
@@ -1649,8 +1649,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -1658,7 +1658,7 @@ jobs:
   j8_build:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 1
@@ -1729,8 +1729,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -1738,7 +1738,7 @@ jobs:
   j8_cqlsh-dtests-py3-no-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 4
@@ -1806,8 +1806,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -1815,7 +1815,7 @@ jobs:
   j8_cqlsh-dtests-py38-with-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 4
@@ -1883,8 +1883,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -1892,7 +1892,7 @@ jobs:
   utests_long:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 1
@@ -1928,8 +1928,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -1937,7 +1937,7 @@ jobs:
   utests_fqltool:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 1
@@ -1973,8 +1973,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -1982,7 +1982,7 @@ jobs:
   j11_dtests-no-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 4
@@ -2053,8 +2053,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
@@ -2063,7 +2063,7 @@ jobs:
   utests_compression:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 4
@@ -2145,8 +2145,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -2154,7 +2154,7 @@ jobs:
   j8_dtest_jars_build:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 1
@@ -2213,8 +2213,8 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
     - CCM_MAX_HEAP_SIZE: 1024M
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
diff --git a/CHANGES.txt b/CHANGES.txt
index b9c8f8d..cfc1f4c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha4
+ * Avoid race condition when completing stream sessions (CASSANDRA-15666)
  * Flush with fast compressors by default (CASSANDRA-15379)
  * Fix CqlInputFormat regression from the switch to system.size_estimates 
(CASSANDRA-15637)
  * Allow sending Entire SSTables over SSL (CASSANDRA-15740)
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
 
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
index 21406b2..d92314b 100644
--- 
a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
+++ 
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
@@ -62,7 +62,7 @@ public class CassandraCompressedStreamWriter extends 
CassandraStreamWriter
         long totalSize = totalSize();
         logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = 
{}, totalSize = {}", session.planId(),
                      sstable.getFilename(), session.peer, 
sstable.getSSTableMetadata().repairedAt, totalSize);
-        try (ChannelProxy fc = sstable.getDataChannel().sharedCopy())
+        try (ChannelProxy fc = sstable.getDataChannel().newChannel())
         {
             long progress = 0L;
 
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
index ffc663d..8382f0a 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
@@ -83,7 +83,7 @@ public class CassandraStreamWriter
                      sstable.getFilename(), session.peer, 
sstable.getSSTableMetadata().repairedAt, totalSize);
 
         AsyncStreamingOutputPlus out = (AsyncStreamingOutputPlus) output;
-        try(ChannelProxy proxy = sstable.getDataChannel().sharedCopy();
+        try(ChannelProxy proxy = sstable.getDataChannel().newChannel();
             ChecksumValidator validator = new 
File(sstable.descriptor.filenameFor(Component.CRC)).exists()
                                           ? 
DataIntegrityMetadata.checksumValidator(sstable.descriptor)
                                           : null)
diff --git a/src/java/org/apache/cassandra/io/util/ChannelProxy.java 
b/src/java/org/apache/cassandra/io/util/ChannelProxy.java
index 91bb03b..9ff46b7 100644
--- a/src/java/org/apache/cassandra/io/util/ChannelProxy.java
+++ b/src/java/org/apache/cassandra/io/util/ChannelProxy.java
@@ -111,6 +111,16 @@ public final class ChannelProxy extends SharedCloseableImpl
         }
     }
 
+    /**
+     * {@link #sharedCopy()} can not be used if thread will be interruped, as 
the backing channel will be closed.
+     *
+     * @return a new channel instance
+     */
+    public final ChannelProxy newChannel()
+    {
+        return new ChannelProxy(filePath);
+    }
+
     public ChannelProxy sharedCopy()
     {
         return new ChannelProxy(this);
diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java 
b/src/java/org/apache/cassandra/net/OutboundConnection.java
index 0503259..b84ebc3 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnection.java
@@ -1561,8 +1561,8 @@ public class OutboundConnection
         Established established = state.established();
         Channel channel = established.channel;
         OutboundConnectionSettings settings = established.settings;
-        return SocketFactory.channelId(settings.from, (InetSocketAddress) 
channel.remoteAddress(),
-                                       settings.to, (InetSocketAddress) 
channel.localAddress(),
+        return SocketFactory.channelId(settings.from, (InetSocketAddress) 
channel.localAddress(),
+                                       settings.to, (InetSocketAddress) 
channel.remoteAddress(),
                                        type, channel.id().asShortText());
     }
 
diff --git a/src/java/org/apache/cassandra/service/QueryState.java 
b/src/java/org/apache/cassandra/service/QueryState.java
index 26f58bf..adb13b5 100644
--- a/src/java/org/apache/cassandra/service/QueryState.java
+++ b/src/java/org/apache/cassandra/service/QueryState.java
@@ -68,7 +68,7 @@ public class QueryState
     public long getTimestamp()
     {
         if (timestamp == Long.MIN_VALUE)
-            timestamp = clientState.getTimestamp();
+            timestamp = ClientState.getTimestamp();
         return timestamp;
     }
 
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java 
b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 6d757b6..c8ebabb 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -42,17 +42,19 @@ public class StreamCoordinator
     private final Map<InetAddressAndPort, HostStreamingData> peerSessions = 
new HashMap<>();
     private final StreamOperation streamOperation;
     private final int connectionsPerHost;
+    private final boolean follower;
     private StreamConnectionFactory factory;
     private Iterator<StreamSession> sessionsToConnect = null;
     private final UUID pendingRepair;
     private final PreviewKind previewKind;
 
     public StreamCoordinator(StreamOperation streamOperation, int 
connectionsPerHost, StreamConnectionFactory factory,
-                             boolean connectSequentially, UUID pendingRepair, 
PreviewKind previewKind)
+                             boolean follower, boolean connectSequentially, 
UUID pendingRepair, PreviewKind previewKind)
     {
         this.streamOperation = streamOperation;
         this.connectionsPerHost = connectionsPerHost;
         this.factory = factory;
+        this.follower = follower;
         this.connectSequentially = connectSequentially;
         this.pendingRepair = pendingRepair;
         this.previewKind = previewKind;
@@ -86,9 +88,9 @@ public class StreamCoordinator
         return results;
     }
 
-    public boolean isReceiving()
+    public boolean isFollower()
     {
-        return connectionsPerHost == 0;
+        return follower;
     }
 
     public void connect(StreamResultFuture future)
@@ -272,8 +274,7 @@ public class StreamCoordinator
         {
             for (StreamSession session : streamSessions.values())
             {
-                StreamSession.State state = session.state();
-                if (state != StreamSession.State.COMPLETE && state != 
StreamSession.State.FAILED)
+                if (!session.state().isFinalState())
                     return true;
             }
             return false;
@@ -284,7 +285,7 @@ public class StreamCoordinator
             // create
             if (streamSessions.size() < connectionsPerHost)
             {
-                StreamSession session = new StreamSession(streamOperation, 
peer, factory, streamSessions.size(),
+                StreamSession session = new StreamSession(streamOperation, 
peer, factory, isFollower(), streamSessions.size(),
                                                           pendingRepair, 
previewKind);
                 streamSessions.put(++lastReturned, session);
                 return session;
@@ -317,7 +318,7 @@ public class StreamCoordinator
             StreamSession session = streamSessions.get(id);
             if (session == null)
             {
-                session = new StreamSession(streamOperation, peer, factory, 
id, pendingRepair, previewKind);
+                session = new StreamSession(streamOperation, peer, factory, 
isFollower(), id, pendingRepair, previewKind);
                 streamSessions.put(id, session);
             }
             return session;
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java 
b/src/java/org/apache/cassandra/streaming/StreamManager.java
index 81c65c5..da77ad2 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -106,12 +106,12 @@ public class StreamManager implements StreamManagerMBean
      * We manage them in two different maps to distinguish plan from initiated 
ones to
      * receiving ones withing the same JVM.
      */
-    private final Map<UUID, StreamResultFuture> initiatedStreams = new 
NonBlockingHashMap<>();
-    private final Map<UUID, StreamResultFuture> receivingStreams = new 
NonBlockingHashMap<>();
+    private final Map<UUID, StreamResultFuture> initiatorStreams = new 
NonBlockingHashMap<>();
+    private final Map<UUID, StreamResultFuture> followerStreams = new 
NonBlockingHashMap<>();
 
     public Set<CompositeData> getCurrentStreams()
     {
-        return 
Sets.newHashSet(Iterables.transform(Iterables.concat(initiatedStreams.values(), 
receivingStreams.values()), new Function<StreamResultFuture, CompositeData>()
+        return 
Sets.newHashSet(Iterables.transform(Iterables.concat(initiatorStreams.values(), 
followerStreams.values()), new Function<StreamResultFuture, CompositeData>()
         {
             public CompositeData apply(StreamResultFuture input)
             {
@@ -120,7 +120,7 @@ public class StreamManager implements StreamManagerMBean
         }));
     }
 
-    public void register(final StreamResultFuture result)
+    public void registerInitiator(final StreamResultFuture result)
     {
         result.addEventListener(notifier);
         // Make sure we remove the stream on completion (whether successful or 
not)
@@ -128,14 +128,14 @@ public class StreamManager implements StreamManagerMBean
         {
             public void run()
             {
-                initiatedStreams.remove(result.planId);
+                initiatorStreams.remove(result.planId);
             }
         }, MoreExecutors.directExecutor());
 
-        initiatedStreams.put(result.planId, result);
+        initiatorStreams.put(result.planId, result);
     }
 
-    public StreamResultFuture registerReceiving(final StreamResultFuture 
result)
+    public StreamResultFuture registerFollower(final StreamResultFuture result)
     {
         result.addEventListener(notifier);
         // Make sure we remove the stream on completion (whether successful or 
not)
@@ -143,17 +143,17 @@ public class StreamManager implements StreamManagerMBean
         {
             public void run()
             {
-                receivingStreams.remove(result.planId);
+                followerStreams.remove(result.planId);
             }
         }, MoreExecutors.directExecutor());
 
-        StreamResultFuture previous = 
receivingStreams.putIfAbsent(result.planId, result);
+        StreamResultFuture previous = 
followerStreams.putIfAbsent(result.planId, result);
         return previous ==  null ? result : previous;
     }
 
     public StreamResultFuture getReceivingStream(UUID planId)
     {
-        return receivingStreams.get(planId);
+        return followerStreams.get(planId);
     }
 
     public void addNotificationListener(NotificationListener listener, 
NotificationFilter filter, Object handback)
@@ -178,11 +178,15 @@ public class StreamManager implements StreamManagerMBean
 
     public StreamSession findSession(InetAddressAndPort peer, UUID planId, int 
sessionIndex)
     {
-        StreamSession session = findSession(initiatedStreams, peer, planId, 
sessionIndex);
+        // Search follower session first, because in some tests, eg. 
StreamingTransferTest, both initiator session
+        // and follower session are listening to local host.
+        // TODO CASSANDRA-15665 it's more robust to add "isFollower" flag into 
{@link  StreamMessageHeader} to distinguish
+        // initiator session and follower session.
+        StreamSession session = findSession(followerStreams, peer, planId, 
sessionIndex);
         if (session !=  null)
             return session;
 
-        return findSession(receivingStreams, peer, planId, sessionIndex);
+        return findSession(initiatorStreams, peer, planId, sessionIndex);
     }
 
     private StreamSession findSession(Map<UUID, StreamResultFuture> streams, 
InetAddressAndPort peer, UUID planId, int sessionIndex)
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java 
b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 3fcabd0..60845fa 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -64,7 +64,7 @@ public class StreamPlan
     {
         this.streamOperation = streamOperation;
         this.coordinator = new StreamCoordinator(streamOperation, 
connectionsPerHost, new DefaultConnectionFactory(),
-                                                 connectSequentially, 
pendingRepair, previewKind);
+                                                 false, connectSequentially, 
pendingRepair, previewKind);
     }
 
     /**
@@ -176,7 +176,7 @@ public class StreamPlan
      */
     public StreamResultFuture execute()
     {
-        return StreamResultFuture.init(planId, streamOperation, handlers, 
coordinator);
+        return StreamResultFuture.createInitiator(planId, streamOperation, 
handlers, coordinator);
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java 
b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 3268ecf..2b5791f 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -68,19 +68,19 @@ public final class StreamResultFuture extends 
AbstractFuture<StreamState>
         this.coordinator = coordinator;
 
         // if there is no session to listen to, we immediately set result for 
returning
-        if (!coordinator.isReceiving() && !coordinator.hasActiveSessions())
+        if (!coordinator.isFollower() && !coordinator.hasActiveSessions())
             set(getCurrentState());
     }
 
     private StreamResultFuture(UUID planId, StreamOperation streamOperation, 
UUID pendingRepair, PreviewKind previewKind)
     {
-        this(planId, streamOperation, new StreamCoordinator(streamOperation, 
0, new DefaultConnectionFactory(), false, pendingRepair, previewKind));
+        this(planId, streamOperation, new StreamCoordinator(streamOperation, 
0, new DefaultConnectionFactory(), true, false, pendingRepair, previewKind));
     }
 
-    public static StreamResultFuture init(UUID planId, StreamOperation 
streamOperation, Collection<StreamEventHandler> listeners,
-                                   StreamCoordinator coordinator)
+    public static StreamResultFuture createInitiator(UUID planId, 
StreamOperation streamOperation, Collection<StreamEventHandler> listeners,
+                                                     StreamCoordinator 
coordinator)
     {
-        StreamResultFuture future = createAndRegister(planId, streamOperation, 
coordinator);
+        StreamResultFuture future = createAndRegisterInitiator(planId, 
streamOperation, coordinator);
         if (listeners != null)
         {
             for (StreamEventHandler listener : listeners)
@@ -100,13 +100,13 @@ public final class StreamResultFuture extends 
AbstractFuture<StreamState>
         return future;
     }
 
-    public static synchronized StreamResultFuture initReceivingSide(int 
sessionIndex,
-                                                                    UUID 
planId,
-                                                                    
StreamOperation streamOperation,
-                                                                    
InetAddressAndPort from,
-                                                                    Channel 
channel,
-                                                                    UUID 
pendingRepair,
-                                                                    
PreviewKind previewKind)
+    public static synchronized StreamResultFuture createFollower(int 
sessionIndex,
+                                                                 UUID planId,
+                                                                 
StreamOperation streamOperation,
+                                                                 
InetAddressAndPort from,
+                                                                 Channel 
channel,
+                                                                 UUID 
pendingRepair,
+                                                                 PreviewKind 
previewKind)
     {
         StreamResultFuture future = 
StreamManager.instance.getReceivingStream(planId);
         if (future == null)
@@ -117,7 +117,7 @@ public final class StreamResultFuture extends 
AbstractFuture<StreamState>
 
             // The main reason we create a StreamResultFuture on the receiving 
side is for JMX exposure.
             future = new StreamResultFuture(planId, streamOperation, 
pendingRepair, previewKind);
-            StreamManager.instance.registerReceiving(future);
+            StreamManager.instance.registerFollower(future);
         }
         future.attachConnection(from, sessionIndex, channel);
         logger.info("[Stream #{}, ID#{}] Received streaming plan for {} from 
{} channel.remote {} channel.local {} channel.id {}",
@@ -125,10 +125,10 @@ public final class StreamResultFuture extends 
AbstractFuture<StreamState>
         return future;
     }
 
-    private static StreamResultFuture createAndRegister(UUID planId, 
StreamOperation streamOperation, StreamCoordinator coordinator)
+    private static StreamResultFuture createAndRegisterInitiator(UUID planId, 
StreamOperation streamOperation, StreamCoordinator coordinator)
     {
         StreamResultFuture future = new StreamResultFuture(planId, 
streamOperation, coordinator);
-        StreamManager.instance.register(future);
+        StreamManager.instance.registerInitiator(future);
         return future;
     }
 
@@ -141,7 +141,6 @@ public final class StreamResultFuture extends 
AbstractFuture<StreamState>
     {
         StreamSession session = coordinator.getOrCreateSessionById(from, 
sessionIndex);
         session.init(this);
-        session.attach(channel);
     }
 
     public void addEventListener(StreamEventHandler listener)
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java 
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 05bb5ff..c6ef5f0 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -17,18 +17,18 @@
  */
 package org.apache.cassandra.streaming;
 
+import java.io.EOFException;
 import java.net.SocketTimeoutException;
 import java.util.*;
 import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.*;
-import com.google.common.util.concurrent.Futures;
 
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.locator.RangesAtEndpoint;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,10 +54,10 @@ import static 
org.apache.cassandra.net.MessagingService.current_version;
 
 /**
  * Handles the streaming a one or more streams to and from a specific remote 
node.
- *
+ *<p/>
  * Both this node and the remote one will create a similar symmetrical {@link 
StreamSession}. A streaming
  * session has the following life-cycle:
- *
+ *<pre>
  * 1. Session Initialization
  *
  *   (a) A node (the initiator in the following) create a new {@link 
StreamSession},
@@ -101,12 +101,15 @@ import static 
org.apache.cassandra.net.MessagingService.current_version;
  *
  * 4. Completion phase
  *
- *   (a) When a node enters the completion phase, it sends a {@link 
CompleteMessage} to the peer, and then enter the
- *       {@link StreamSession.State#WAIT_COMPLETE} state. If it has already 
received a {@link CompleteMessage}
- *       from the peer, session is complete and is then closed ({@link 
#closeSession(State)}). Otherwise, the node
- *       switch to the {@link StreamSession.State#WAIT_COMPLETE} state and 
send a {@link CompleteMessage} to the other side.
+ *   (a) When the initiator finishes streaming, it enters the {@link 
StreamSession.State#WAIT_COMPLETE} state, and waits
+ *       for the follower to send a {@link CompleteMessage} once it finishes 
streaming too. Once the {@link CompleteMessage}
+ *       is received, initiator sets its own state to {@link 
StreamSession.State#COMPLETE} and closes all channels attached
+ *       to this session.
+ *
+ * </pre>
  *
  * In brief, the message passing looks like this (I for initiator, F for 
follwer):
+ * <pre>
  * (session init)
  * I: StreamInitMessage
  * (session prepare)
@@ -117,7 +120,8 @@ import static 
org.apache.cassandra.net.MessagingService.current_version;
  * I: OutgoingStreamMessage
  * F: ReceivedMessage
  * (completion)
- * I/F: CompleteMessage
+ * F: CompleteMessage
+ *</pre>
  *
  * All messages which derive from {@link StreamMessage} are sent by the 
standard internode messaging
  * (via {@link org.apache.cassandra.net.MessagingService}, while the actual 
files themselves are sent by a special
@@ -127,6 +131,9 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
 {
     private static final Logger logger = 
LoggerFactory.getLogger(StreamSession.class);
 
+    // for test purpose to record received message and state transition
+    public volatile static MessageStateSink sink = MessageStateSink.NONE;
+
     private final StreamOperation streamOperation;
 
     /**
@@ -153,43 +160,77 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
 
     final Map<String, Set<Range<Token>>> transferredRangesPerKeyspace = new 
HashMap<>();
 
+    private final boolean isFollower;
     private final NettyStreamingMessageSender messageSender;
-    private final ConcurrentMap<ChannelId, Channel> incomingChannels = new 
ConcurrentHashMap<>();
+    // contains both inbound and outbound channels
+    private final ConcurrentMap<ChannelId, Channel> channels = new 
ConcurrentHashMap<>();
+
+    // "maybeCompleted()" should be executed at most once. Because it can be 
executed asynchronously by IO
+    // threads(serialization/deserialization) and stream messaging processing 
thread, causing connection closed before
+    // receiving peer's CompleteMessage.
+    private boolean maybeCompleted = false;
+    private Future closeFuture;
 
-    private final AtomicBoolean isAborted = new AtomicBoolean(false);
     private final UUID pendingRepair;
     private final PreviewKind previewKind;
 
+    /**
+     * State Transition:
+     *
+     * <pre>
+     *  +------------------+----------> FAILED <--------------------+
+     *  |                  |              ^                         |
+     *  |                  |              |       initiator         |
+     *  INITIALIZED --> PREPARING --> STREAMING ------------> WAIT_COMPLETE 
----> COMPLETED
+     *  |                  |              |                         ^          
       ^
+     *  |                  |              |       follower          |          
       |
+     *  |                  |              
+-------------------------)-----------------+
+     *  |                  |                                        |          
       |
+     *  |                  |         if preview                     |          
       |
+     *  |                  +----------------------------------------+          
       |
+     *  |               nothing to request or to transfer                      
       |
+     *  
+-----------------------------------------------------------------------------+
+     *                  nothing to request or to transfer
+     *
+     *  </pre>
+     */
     public enum State
     {
-        INITIALIZED,
-        PREPARING,
-        STREAMING,
-        WAIT_COMPLETE,
-        COMPLETE,
-        FAILED,
+        INITIALIZED(false),
+        PREPARING(false),
+        STREAMING(false),
+        WAIT_COMPLETE(false),
+        COMPLETE(true),
+        FAILED(true);
+
+        private final boolean finalState;
+
+        State(boolean finalState)
+        {
+            this.finalState = finalState;
+        }
+
+        /**
+         * @return true if current state is final, either COMPLETE OR FAILED.
+         */
+        public boolean isFinalState()
+        {
+             return finalState;
+        }
     }
 
     private volatile State state = State.INITIALIZED;
-    private volatile boolean completeSent = false;
 
     /**
      * Create new streaming session with the peer.
      */
     public StreamSession(StreamOperation streamOperation, InetAddressAndPort 
peer, StreamConnectionFactory factory,
-                         int index, UUID pendingRepair, PreviewKind 
previewKind)
-    {
-        this(streamOperation, new OutboundConnectionSettings(peer), factory, 
index, pendingRepair, previewKind);
-    }
-    /**
-     * Create new streaming session with the peer.
-     */
-    public StreamSession(StreamOperation streamOperation, 
OutboundConnectionSettings template, StreamConnectionFactory factory,
-                         int index, UUID pendingRepair, PreviewKind 
previewKind)
+                         boolean isFollower, int index, UUID pendingRepair, 
PreviewKind previewKind)
     {
         this.streamOperation = streamOperation;
-        this.peer = template.to;
-        this.template = template;
+        this.peer = peer;
+        this.template = new OutboundConnectionSettings(peer);
+        this.isFollower = isFollower;
         this.index = index;
 
         this.messageSender = new NettyStreamingMessageSender(this, template, 
factory, current_version, previewKind.isPreview());
@@ -197,7 +238,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         this.pendingRepair = pendingRepair;
         this.previewKind = previewKind;
 
-        logger.debug("Creating stream session to {}", template);
+        logger.debug("Creating stream session to {} as {}", template, 
isFollower ? "follower" : "initiator");
     }
 
     public UUID planId()
@@ -253,11 +294,46 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         StreamHook.instance.reportStreamFuture(this, streamResult);
     }
 
-    public boolean attach(Channel channel)
+    /**
+     * Attach a channel to this session upon receiving the first inbound 
message.
+     *
+     * @param channel The channel to attach.
+     * @param isControlChannel If the channel is the one to send control 
messages to.
+     * @return False if the channel was already attached, true otherwise.
+     */
+    public synchronized boolean attachInbound(Channel channel, boolean 
isControlChannel)
     {
-        if (!messageSender.hasControlChannel())
+        failIfFinished();
+
+        if (!messageSender.hasControlChannel() && isControlChannel)
             messageSender.injectControlMessageChannel(channel);
-        return incomingChannels.putIfAbsent(channel.id(), channel) == null;
+
+        channel.closeFuture().addListener(ignored -> onChannelClose(channel));
+        return channels.putIfAbsent(channel.id(), channel) == null;
+    }
+
+    /**
+     * Attach a channel to this session upon sending the first outbound 
message.
+     *
+     * @param channel The channel to attach.
+     * @return False if the channel was already attached, true otherwise.
+     */
+    public synchronized boolean attachOutbound(Channel channel)
+    {
+        failIfFinished();
+
+        channel.closeFuture().addListener(ignored -> onChannelClose(channel));
+        return channels.putIfAbsent(channel.id(), channel) == null;
+    }
+
+    /**
+     * On channel closing, if no channels are left just close the message 
sender; this must be closed last to ensure
+     * keep alive messages are sent until the very end of the streaming 
session.
+     */
+    private synchronized void onChannelClose(Channel channel)
+    {
+        if (channels.remove(channel.id()) != null && channels.isEmpty())
+            messageSender.close();
     }
 
     /**
@@ -339,7 +415,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
 
     private void failIfFinished()
     {
-        if (state() == State.COMPLETE || state() == State.FAILED)
+        if (state().isFinalState())
             throw new RuntimeException(String.format("Stream %s is finished 
with state %s", planId(), state().name()));
     }
 
@@ -399,22 +475,32 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
 
     private synchronized Future closeSession(State finalState)
     {
-        Future abortedTasksFuture = null;
-        if (isAborted.compareAndSet(false, true))
-        {
-            state(finalState);
+        // it's session is already closed
+        if (closeFuture != null)
+            return closeFuture;
 
-            // ensure aborting the tasks do not happen on the network IO 
thread (read: netty event loop)
-            // as we don't want any blocking disk IO to stop the network thread
-            if (finalState == State.FAILED)
-                abortedTasksFuture = 
ScheduledExecutors.nonPeriodicTasks.submit(this::abortTasks);
+        state(finalState);
 
-            incomingChannels.values().stream().map(channel -> channel.close());
-            messageSender.close();
+        List<Future> futures = new ArrayList<>();
 
-            streamResult.handleSessionComplete(this);
+        // ensure aborting the tasks do not happen on the network IO thread 
(read: netty event loop)
+        // as we don't want any blocking disk IO to stop the network thread
+        if (finalState == State.FAILED)
+            
futures.add(ScheduledExecutors.nonPeriodicTasks.submit(this::abortTasks));
+
+        // Channels should only be closed by the initiator; but, if this 
session closed
+        // due to failure, channels should be always closed regardless, even 
if this is not the initator.
+        if (!isFollower || state != State.COMPLETE)
+        {
+            logger.debug("[Stream #{}] Will close attached channels {}", 
planId(), channels);
+            channels.values().forEach(channel -> futures.add(channel.close()));
         }
-        return abortedTasksFuture != null ? abortedTasksFuture : 
Futures.immediateFuture(null);
+
+        sink.onClose(peer);
+        streamResult.handleSessionComplete(this);
+        closeFuture = FBUtilities.allOf(futures);
+
+        return closeFuture;
     }
 
     private void abortTasks()
@@ -426,7 +512,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         }
         catch (Exception e)
         {
-            logger.warn("failed to abort some streaming tasks", e);
+            logger.warn("[Stream #{}] failed to abort some streaming tasks", 
planId(), e);
         }
     }
 
@@ -437,6 +523,10 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
      */
     public void state(State newState)
     {
+        if (logger.isTraceEnabled())
+            logger.trace("[Stream #{}] Changing session state from {} to {}", 
planId(), state, newState);
+
+        sink.recordState(peer, newState);
         state = newState;
     }
 
@@ -463,21 +553,29 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         return state == State.COMPLETE;
     }
 
-    public void messageReceived(StreamMessage message)
+    public synchronized void messageReceived(StreamMessage message)
     {
+        if (message.type != StreamMessage.Type.KEEP_ALIVE)
+            failIfFinished();
+
+        sink.recordMessage(peer, message.type);
+
         switch (message.type)
         {
             case STREAM_INIT:
-                // nop
+                // at follower, nop
                 break;
             case PREPARE_SYN:
+                // at follower
                 PrepareSynMessage msg = (PrepareSynMessage) message;
                 prepare(msg.requests, msg.summaries);
                 break;
             case PREPARE_SYNACK:
+                // at initiator
                 prepareSynAck((PrepareSynAckMessage) message);
                 break;
             case PREPARE_ACK:
+                // at follower
                 prepareAck((PrepareAckMessage) message);
                 break;
             case STREAM:
@@ -488,6 +586,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
                 received(received.tableId, received.sequenceNumber);
                 break;
             case COMPLETE:
+                // at initiator
                 complete();
                 break;
             case KEEP_ALIVE:
@@ -529,10 +628,32 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
     }
 
     /**
-     * Call back for handling exception during streaming.
+     * Signal an error to this stream session: if it's an EOF exception, it 
tries to understand if the socket was closed
+     * after completion or because the peer was down, otherwise sends a {@link 
SessionFailedMessage} and closes
+     * the session as {@link State#FAILED}.
      */
-    public Future onError(Throwable e)
+    public synchronized Future onError(Throwable e)
     {
+        boolean isEofException = e instanceof EOFException;
+        if (isEofException)
+        {
+            if (state.finalState)
+            {
+                logger.debug("[Stream #{}] Socket closed after session 
completed with state {}", planId(), state);
+
+                return null;
+            }
+            else
+            {
+                logger.error("[Stream #{}] Socket closed before session 
completion, peer {} is probably down.",
+                             planId(),
+                             peer.address.getHostAddress(),
+                             e);
+
+                return closeSession(State.FAILED);
+            }
+        }
+
         logError(e);
         // send session failure message
         if (messageSender.connected())
@@ -577,7 +698,6 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
      */
     private void prepareAsync(Collection<StreamRequest> requests, 
Collection<StreamSummary> summaries)
     {
-
         for (StreamRequest request : requests)
             addTransferRanges(request.keyspace, 
RangesAtEndpoint.concat(request.full, request.transientReplicas), 
request.columnFamilies, true); // always flush on stream request
         for (StreamSummary summary : summaries)
@@ -589,9 +709,12 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
                 prepareSynAck.summaries.add(task.getSummary());
         messageSender.sendMessage(prepareSynAck);
 
-
         streamResult.handleSessionPrepared(this);
-        maybeCompleted();
+
+        if (isPreview())
+            completePreview();
+        else
+            maybeCompleted();
     }
 
     private void prepareSynAck(PrepareSynAckMessage msg)
@@ -602,7 +725,8 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
                 prepareReceiving(summary);
 
             // only send the (final) ACK if we are expecting the peer to send 
this node (the initiator) some files
-            messageSender.sendMessage(new PrepareAckMessage());
+            if (!isPreview())
+                messageSender.sendMessage(new PrepareAckMessage());
         }
 
         if (isPreview())
@@ -614,9 +738,8 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
     private void prepareAck(PrepareAckMessage msg)
     {
         if (isPreview())
-            completePreview();
-        else
-            startStreamingFiles(true);
+            throw new RuntimeException(String.format("[Stream #%s] Cannot 
receive PrepareAckMessage for preview session", planId()));
+        startStreamingFiles(true);
     }
 
     /**
@@ -646,7 +769,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
     {
         if (isPreview())
         {
-            throw new RuntimeException("Cannot receive files for preview 
session");
+            throw new RuntimeException(String.format("[Stream #%s] Cannot 
receive files for preview session", planId()));
         }
 
         long headerSize = message.stream.getSize();
@@ -674,20 +797,49 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
      */
     public synchronized void complete()
     {
-        logger.debug("handling Complete message, state = {}, completeSent = 
{}", state, completeSent);
-        if (state == State.WAIT_COMPLETE)
+        logger.debug("[Stream #{}] handling Complete message, state = {}", 
planId(), state);
+
+        if (!isFollower)
         {
-            if (!completeSent)
-            {
-                messageSender.sendMessage(new CompleteMessage());
-                completeSent = true;
-            }
-            closeSession(State.COMPLETE);
+            if (state == State.WAIT_COMPLETE)
+                closeSession(State.COMPLETE);
+            else
+                state(State.WAIT_COMPLETE);
         }
         else
         {
-            state(State.WAIT_COMPLETE);
+            // pre-4.0 nodes should not be connected via streaming, see {@link 
MessagingService#accept_streaming}
+            throw new IllegalStateException(String.format("[Stream #%s] 
Complete message can be only received by the initiator!", planId()));
+        }
+    }
+
+    /**
+     * Synchronize both {@link #complete()} and {@link #maybeCompleted()} to 
avoid racing
+     */
+    private synchronized boolean maybeCompleted()
+    {
+        if (!(receivers.isEmpty() && transfers.isEmpty()))
+            return false;
+
+        // if already executed once, skip it
+        if (maybeCompleted)
+            return true;
+
+        maybeCompleted = true;
+        if (!isFollower)
+        {
+            if (state == State.WAIT_COMPLETE)
+                closeSession(State.COMPLETE);
+            else
+                state(State.WAIT_COMPLETE);
+        }
+        else
+        {
+            messageSender.sendMessage(new CompleteMessage());
+            closeSession(State.COMPLETE);
         }
+
+        return true;
     }
 
     /**
@@ -760,31 +912,6 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         }
     }
 
-    private boolean maybeCompleted()
-    {
-        boolean completed = receivers.isEmpty() && transfers.isEmpty();
-        if (completed)
-        {
-            if (state == State.WAIT_COMPLETE)
-            {
-                if (!completeSent)
-                {
-                    messageSender.sendMessage(new CompleteMessage());
-                    completeSent = true;
-                }
-                closeSession(State.COMPLETE);
-            }
-            else
-            {
-                // notify peer that this session is completed
-                messageSender.sendMessage(new CompleteMessage());
-                completeSent = true;
-                state(State.WAIT_COMPLETE);
-            }
-        }
-        return completed;
-    }
-
     /**
      * Flushes matching column families from the given keyspace, or all 
columnFamilies
      * if the cf list is empty.
@@ -843,4 +970,43 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
     {
         return transferredRangesPerKeyspace.size();
     }
+
+    @VisibleForTesting
+    public static interface MessageStateSink
+    {
+        static final MessageStateSink NONE = new MessageStateSink() {
+            @Override
+            public void recordState(InetAddressAndPort from, State state)
+            {
+            }
+
+            @Override
+            public void recordMessage(InetAddressAndPort from, 
StreamMessage.Type message)
+            {
+            }
+
+            @Override
+            public void onClose(InetAddressAndPort from)
+            {
+            }
+        };
+
+        /**
+         * @param from peer that is connected in the stream session
+         * @param state new state to change to
+         */
+        public void recordState(InetAddressAndPort from, StreamSession.State 
state);
+
+        /**
+         * @param from peer that sends the given message
+         * @param message stream message sent by peer
+         */
+        public void recordMessage(InetAddressAndPort from, StreamMessage.Type 
message);
+
+        /**
+         *
+         * @param from peer that is being disconnected
+         */
+        public void onClose(InetAddressAndPort from);
+    }
 }
diff --git 
a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
 
b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
index 1314e1d..fba56f5 100644
--- 
a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
+++ 
b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
@@ -21,9 +21,8 @@ package org.apache.cassandra.streaming.async;
 import java.io.IOError;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
+import java.nio.channels.ClosedByInterruptException;
 import java.util.Collection;
-import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -33,6 +32,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,6 +61,7 @@ import 
org.apache.cassandra.streaming.messages.OutgoingStreamMessage;
 import org.apache.cassandra.streaming.messages.StreamInitMessage;
 import org.apache.cassandra.streaming.messages.StreamMessage;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.JVMStabilityInspector;
 
 /**
  * Responsible for sending {@link StreamMessage}s to a given peer. We manage 
an array of netty {@link Channel}s
@@ -102,7 +103,7 @@ public class NettyStreamingMessageSender implements 
StreamingMessageSender
      * A special {@link Channel} for sending non-stream streaming messages, 
basically anything that isn't an
      * {@link OutgoingStreamMessage} (or an {@link IncomingStreamMessage}, but 
a node doesn't send that, it's only received).
      */
-    private Channel controlMessageChannel;
+    private volatile Channel controlMessageChannel;
 
     // note: this really doesn't need to be a LBQ, just something that's 
thread safe
     private final Collection<ScheduledFuture<?>> channelKeepAlives = new 
LinkedBlockingQueue<>();
@@ -153,6 +154,9 @@ public class NettyStreamingMessageSender implements 
StreamingMessageSender
         return controlMessageChannel != null;
     }
 
+    /**
+     * Used by follower to setup control message channel created by initiator
+     */
     public void injectControlMessageChannel(Channel channel)
     {
         this.controlMessageChannel = channel;
@@ -160,11 +164,20 @@ public class NettyStreamingMessageSender implements 
StreamingMessageSender
         scheduleKeepAliveTask(channel);
     }
 
+    /**
+     * Used by initiator to setup control message channel connecting to 
follower
+     */
     private void setupControlMessageChannel() throws IOException
     {
         if (controlMessageChannel == null)
         {
-            controlMessageChannel = createChannel();
+            /*
+             * Inbound handlers are needed:
+             *  a) for initiator's control channel(the first outbound channel) 
to receive follower's message.
+             *  b) for streaming receiver (note: both initiator and follower 
can receive streaming files) to reveive files,
+             *     in {@link Handler#setupStreamingPipeline}
+             */
+            controlMessageChannel = createChannel(true);
             scheduleKeepAliveTask(controlMessageChannel);
         }
     }
@@ -181,11 +194,16 @@ public class NettyStreamingMessageSender implements 
StreamingMessageSender
         task.future = scheduledFuture;
     }
     
-    private Channel createChannel() throws IOException
+    private Channel createChannel(boolean isInboundHandlerNeeded) throws 
IOException
     {
         Channel channel = factory.createConnection(template, streamingVersion);
-        ChannelPipeline pipeline = channel.pipeline();
-        pipeline.addLast("stream", new StreamingInboundHandler(template.to, 
streamingVersion, session));
+        session.attachOutbound(channel);
+
+        if (isInboundHandlerNeeded)
+        {
+            ChannelPipeline pipeline = channel.pipeline();
+            pipeline.addLast("stream", new 
StreamingInboundHandler(template.to, streamingVersion, session));
+        }
         channel.attr(TRANSFERRING_FILE_ATTR).set(Boolean.FALSE);
         logger.debug("Creating channel id {} local {} remote {}", 
channel.id(), channel.localAddress(), channel.remoteAddress());
         return channel;
@@ -316,9 +334,10 @@ public class NettyStreamingMessageSender implements 
StreamingMessageSender
             if (!acquirePermit(SEMAPHORE_UNAVAILABLE_LOG_INTERVAL))
                 return;
 
+            Channel channel = null;
             try
             {
-                Channel channel = getOrCreateChannel();
+                channel = getOrCreateChannel();
                 if (!channel.attr(TRANSFERRING_FILE_ATTR).compareAndSet(false, 
true))
                     throw new IllegalStateException("channel's transferring 
state is currently set to true. refusing to start new stream");
 
@@ -336,6 +355,19 @@ public class NettyStreamingMessageSender implements 
StreamingMessageSender
             {
                 session.onError(e);
             }
+            catch (Throwable t)
+            {
+                if (closed && Throwables.getRootCause(t) instanceof 
ClosedByInterruptException && fileTransferExecutor.isShutdown())
+                {
+                    logger.debug("{} Streaming channel was closed due to the 
executor pool being shutdown", createLogTag(session, channel));
+                }
+                else
+                {
+                    JVMStabilityInspector.inspectThrowable(t);
+                    if (!session.state().isFinalState())
+                        session.onError(t);
+                }
+            }
             finally
             {
                 fileTransferSemaphore.release();
@@ -383,7 +415,7 @@ public class NettyStreamingMessageSender implements 
StreamingMessageSender
                 if (channel != null)
                     return channel;
 
-                channel = createChannel();
+                channel = createChannel(false);
                 threadToChannelMap.put(currentThread, channel);
                 return channel;
             }
@@ -513,6 +545,9 @@ public class NettyStreamingMessageSender implements 
StreamingMessageSender
     @Override
     public void close()
     {
+        if (closed)
+            return;
+
         closed = true;
         if (logger.isDebugEnabled())
             logger.debug("{} Closing stream connection channels on {}", 
createLogTag(session, null), template.to);
@@ -520,14 +555,7 @@ public class NettyStreamingMessageSender implements 
StreamingMessageSender
             future.cancel(false);
         channelKeepAlives.clear();
 
-        List<Future<Void>> futures = new 
ArrayList<>(threadToChannelMap.size());
-        for (Channel channel : threadToChannelMap.values())
-            futures.add(channel.close());
-        FBUtilities.waitOnFutures(futures, 10, TimeUnit.SECONDS);
         threadToChannelMap.clear();
         fileTransferExecutor.shutdownNow();
-
-        if (controlMessageChannel != null)
-            controlMessageChannel.close();
     }
 }
diff --git 
a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java 
b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
index a319fea..edc74e3 100644
--- a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
+++ b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
@@ -18,12 +18,9 @@
 
 package org.apache.cassandra.streaming.async;
 
-import java.io.EOFException;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -31,7 +28,6 @@ import java.util.function.Function;
 import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,7 +40,6 @@ import io.netty.util.ReferenceCountUtil;
 import io.netty.util.concurrent.FastThreadLocalThread;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.AsyncStreamingInputPlus;
-import org.apache.cassandra.net.AsyncStreamingInputPlus.InputTimeoutException;
 import org.apache.cassandra.streaming.StreamManager;
 import org.apache.cassandra.streaming.StreamReceiveException;
 import org.apache.cassandra.streaming.StreamResultFuture;
@@ -183,7 +178,7 @@ public class StreamingInboundHandler extends 
ChannelInboundHandlerAdapter
                         Uninterruptibles.sleepUninterruptibly(400, 
TimeUnit.MILLISECONDS);
                     }
 
-                    StreamMessage message = StreamMessage.deserialize(buffers, 
protocolVersion, null);
+                    StreamMessage message = StreamMessage.deserialize(buffers, 
protocolVersion);
 
                     // keep-alives don't necessarily need to be tied to a 
session (they could be arrive before or after
                     // wrt session lifecycle, due to races), just log that we 
received the message and carry on
@@ -203,10 +198,6 @@ public class StreamingInboundHandler extends 
ChannelInboundHandlerAdapter
                     session.messageReceived(message);
                 }
             }
-            catch (InputTimeoutException | EOFException e)
-            {
-                // ignore
-            }
             catch (Throwable t)
             {
                 JVMStabilityInspector.inspectThrowable(t);
@@ -248,7 +239,7 @@ public class StreamingInboundHandler extends 
ChannelInboundHandlerAdapter
             {
                 assert session == null : "initiator of stream session received 
a StreamInitMessage";
                 StreamInitMessage init = (StreamInitMessage) message;
-                StreamResultFuture.initReceivingSide(init.sessionIndex, 
init.planId, init.streamOperation, init.from, channel, init.pendingRepair, 
init.previewKind);
+                StreamResultFuture.createFollower(init.sessionIndex, 
init.planId, init.streamOperation, init.from, channel, init.pendingRepair, 
init.previewKind);
                 streamSession = sessionProvider.apply(new 
SessionIdentifier(init.from, init.planId, init.sessionIndex));
             }
             else if (message instanceof IncomingStreamMessage)
@@ -262,7 +253,9 @@ public class StreamingInboundHandler extends 
ChannelInboundHandlerAdapter
             if (streamSession == null)
                 throw new IllegalStateException(createLogTag(null, channel) + 
" no session found for message " + message);
 
-            streamSession.attach(channel);
+            // Attach this channel to the session: this only happens upon 
receiving the first init message as a follower;
+            // in all other cases, no new control channel will be added, as 
the proper control channel will be already attached.
+            streamSession.attachInbound(channel, message instanceof 
StreamInitMessage);
             return streamSession;
         }
     }
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
index 81e16f7..83d95e0 100644
--- a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
@@ -25,7 +25,7 @@ public class CompleteMessage extends StreamMessage
 {
     public static Serializer<CompleteMessage> serializer = new 
Serializer<CompleteMessage>()
     {
-        public CompleteMessage deserialize(DataInputPlus in, int version, 
StreamSession session)
+        public CompleteMessage deserialize(DataInputPlus in, int version)
         {
             return new CompleteMessage();
         }
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
index e17c3ab..5f69a90 100644
--- 
a/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
+++ 
b/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
@@ -35,10 +35,10 @@ public class IncomingStreamMessage extends StreamMessage
     public static Serializer<IncomingStreamMessage> serializer = new 
Serializer<IncomingStreamMessage>()
     {
         @SuppressWarnings("resource")
-        public IncomingStreamMessage deserialize(DataInputPlus input, int 
version, StreamSession session) throws IOException
+        public IncomingStreamMessage deserialize(DataInputPlus input, int 
version) throws IOException
         {
             StreamMessageHeader header = 
StreamMessageHeader.serializer.deserialize(input, version);
-            session = StreamManager.instance.findSession(header.sender, 
header.planId, header.sessionIndex);
+            StreamSession session = 
StreamManager.instance.findSession(header.sender, header.planId, 
header.sessionIndex);
             if (session == null)
                 throw new IllegalStateException(String.format("unknown stream 
session: %s - %d", header.planId, header.sessionIndex));
             ColumnFamilyStore cfs = 
ColumnFamilyStore.getIfExists(header.tableId);
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
index f80c617..5352b3b 100644
--- a/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
@@ -28,7 +28,7 @@ public class KeepAliveMessage extends StreamMessage
 {
     public static Serializer<KeepAliveMessage> serializer = new 
Serializer<KeepAliveMessage>()
     {
-        public KeepAliveMessage deserialize(DataInputPlus in, int version, 
StreamSession session) throws IOException
+        public KeepAliveMessage deserialize(DataInputPlus in, int version) 
throws IOException
         {
             return new KeepAliveMessage();
         }
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java
index 263aabd..8406f80 100644
--- 
a/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java
+++ 
b/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java
@@ -32,7 +32,7 @@ public class OutgoingStreamMessage extends StreamMessage
 {
     public static Serializer<OutgoingStreamMessage> serializer = new 
Serializer<OutgoingStreamMessage>()
     {
-        public OutgoingStreamMessage deserialize(DataInputPlus in, int 
version, StreamSession session)
+        public OutgoingStreamMessage deserialize(DataInputPlus in, int version)
         {
             throw new UnsupportedOperationException("Not allowed to call 
deserialize on an outgoing stream");
         }
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java
index f43ff01..97fdff7 100644
--- a/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java
@@ -33,7 +33,7 @@ public class PrepareAckMessage extends StreamMessage
             //nop
         }
 
-        public PrepareAckMessage deserialize(DataInputPlus in, int version, 
StreamSession session) throws IOException
+        public PrepareAckMessage deserialize(DataInputPlus in, int version) 
throws IOException
         {
             return new PrepareAckMessage();
         }
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java
index 2d8026c..4e5e8fb 100644
--- a/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java
@@ -38,7 +38,7 @@ public class PrepareSynAckMessage extends StreamMessage
                 StreamSummary.serializer.serialize(summary, out, version);
         }
 
-        public PrepareSynAckMessage deserialize(DataInputPlus input, int 
version, StreamSession session) throws IOException
+        public PrepareSynAckMessage deserialize(DataInputPlus input, int 
version) throws IOException
         {
             PrepareSynAckMessage message = new PrepareSynAckMessage();
             int numSummaries = input.readInt();
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java
index 6fbaafa..e378af7 100644
--- a/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java
@@ -31,7 +31,7 @@ public class PrepareSynMessage extends StreamMessage
 {
     public static Serializer<PrepareSynMessage> serializer = new 
Serializer<PrepareSynMessage>()
     {
-        public PrepareSynMessage deserialize(DataInputPlus input, int version, 
StreamSession session) throws IOException
+        public PrepareSynMessage deserialize(DataInputPlus input, int version) 
throws IOException
         {
             PrepareSynMessage message = new PrepareSynMessage();
             // requests
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
index 3988dcc..ff2cdec 100644
--- a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
@@ -29,7 +29,7 @@ public class ReceivedMessage extends StreamMessage
     public static Serializer<ReceivedMessage> serializer = new 
Serializer<ReceivedMessage>()
     {
         @SuppressWarnings("resource") // Not closing constructed 
DataInputPlus's as the channel needs to remain open.
-        public ReceivedMessage deserialize(DataInputPlus input, int version, 
StreamSession session) throws IOException
+        public ReceivedMessage deserialize(DataInputPlus input, int version) 
throws IOException
         {
             return new ReceivedMessage(TableId.deserialize(input), 
input.readInt());
         }
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
index 59ad90e..ca10bcc 100644
--- a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
@@ -25,7 +25,7 @@ public class SessionFailedMessage extends StreamMessage
 {
     public static Serializer<SessionFailedMessage> serializer = new 
Serializer<SessionFailedMessage>()
     {
-        public SessionFailedMessage deserialize(DataInputPlus in, int version, 
StreamSession session)
+        public SessionFailedMessage deserialize(DataInputPlus in, int version)
         {
             return new SessionFailedMessage();
         }
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index e148790..953f2c4 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -86,7 +86,7 @@ public class StreamInitMessage extends StreamMessage
             out.writeInt(message.previewKind.getSerializationVal());
         }
 
-        public StreamInitMessage deserialize(DataInputPlus in, int version, 
StreamSession session) throws IOException
+        public StreamInitMessage deserialize(DataInputPlus in, int version) 
throws IOException
         {
             InetAddressAndPort from = 
inetAddressAndPortSerializer.deserialize(in, version);
             int sessionIndex = in.readInt();
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index 72e180c..2f42f1b 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -43,16 +43,16 @@ public abstract class StreamMessage
         return 1 + message.type.outSerializer.serializedSize(message, version);
     }
 
-    public static StreamMessage deserialize(DataInputPlus in, int version, 
StreamSession session) throws IOException
+    public static StreamMessage deserialize(DataInputPlus in, int version) 
throws IOException
     {
         Type type = Type.lookupById(in.readByte());
-        return type.inSerializer.deserialize(in, version, session);
+        return type.inSerializer.deserialize(in, version);
     }
 
     /** StreamMessage serializer */
     public static interface Serializer<V extends StreamMessage>
     {
-        V deserialize(DataInputPlus in, int version, StreamSession session) 
throws IOException;
+        V deserialize(DataInputPlus in, int version) throws IOException;
         void serialize(V message, DataOutputStreamPlus out, int version, 
StreamSession session) throws IOException;
         long serializedSize(V message, int version) throws IOException;
     }
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java 
b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 1df84ab..115cd43 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -486,6 +486,81 @@ public class FBUtilities
             Uninterruptibles.sleepUninterruptibly(delay, 
TimeUnit.MILLISECONDS);
         }
     }
+
+    /**
+     * Returns a new {@link Future} wrapping the given list of futures and 
returning a list of their results.
+     */
+    public static Future<List> allOf(Collection<Future> futures)
+    {
+        if (futures.isEmpty())
+            return CompletableFuture.completedFuture(null);
+
+        return new Future<List>()
+        {
+            @Override
+            @SuppressWarnings("unchecked")
+            public List get() throws InterruptedException, ExecutionException
+            {
+                List result = new ArrayList<>(futures.size());
+                for (Future current : futures)
+                {
+                    result.add(current.get());
+                }
+                return result;
+            }
+
+            @Override
+            @SuppressWarnings("unchecked")
+            public List get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException
+            {
+                List result = new ArrayList<>(futures.size());
+                long deadline = System.nanoTime() + 
TimeUnit.NANOSECONDS.convert(timeout, unit);
+                for (Future current : futures)
+                {
+                    long remaining = deadline - System.nanoTime();
+                    if (remaining <= 0)
+                        throw new TimeoutException();
+
+                    result.add(current.get(remaining, TimeUnit.NANOSECONDS));
+                }
+                return result;
+            }
+
+            @Override
+            public boolean cancel(boolean mayInterruptIfRunning)
+            {
+                for (Future current : futures)
+                {
+                    if (!current.cancel(mayInterruptIfRunning))
+                        return false;
+                }
+                return true;
+            }
+
+            @Override
+            public boolean isCancelled()
+            {
+                for (Future current : futures)
+                {
+                    if (!current.isCancelled())
+                        return false;
+                }
+                return true;
+            }
+
+            @Override
+            public boolean isDone()
+            {
+                for (Future current : futures)
+                {
+                    if (!current.isDone())
+                        return false;
+                }
+                return true;
+            }
+        };
+    }
+
     /**
      * Create a new instance of a partitioner defined in an SSTable Descriptor
      * @param desc Descriptor of an sstable
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java
index bafd03d..956f21e 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java
@@ -18,16 +18,39 @@
 
 package org.apache.cassandra.distributed.test;
 
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.StreamMessage;
 
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.streaming.StreamSession.State.PREPARING;
+import static org.apache.cassandra.streaming.StreamSession.State.STREAMING;
+import static org.apache.cassandra.streaming.StreamSession.State.WAIT_COMPLETE;
+import static 
org.apache.cassandra.streaming.messages.StreamMessage.Type.PREPARE_ACK;
+import static 
org.apache.cassandra.streaming.messages.StreamMessage.Type.PREPARE_SYN;
+import static 
org.apache.cassandra.streaming.messages.StreamMessage.Type.PREPARE_SYNACK;
+import static 
org.apache.cassandra.streaming.messages.StreamMessage.Type.RECEIVED;
+import static 
org.apache.cassandra.streaming.messages.StreamMessage.Type.STREAM;
+import static 
org.apache.cassandra.streaming.messages.StreamMessage.Type.STREAM_INIT;
 
 public class StreamingTest extends TestBaseImpl
 {
@@ -51,6 +74,9 @@ public class StreamingTest extends TestBaseImpl
                 Assert.assertEquals(0, results.length);
             }
 
+            // collect message and state
+            registerSink(cluster, nodes);
+
             cluster.get(nodes).runOnInstance(() -> 
StorageService.instance.rebuild(null, KEYSPACE, null, null));
             {
                 Object[][] results = 
cluster.get(nodes).executeInternal(String.format("SELECT k, c1, c2 FROM 
%s.cf;", KEYSPACE));
@@ -72,4 +98,85 @@ public class StreamingTest extends TestBaseImpl
         testStreaming(2, 2, 1000, "LeveledCompactionStrategy");
     }
 
+    public static void registerSink(Cluster cluster, int initiatorNodeId)
+    {
+        IInvokableInstance initiatorNode = cluster.get(initiatorNodeId);
+        InetSocketAddress initiator = initiatorNode.broadcastAddress();
+        MessageStateSinkImpl initiatorSink = new MessageStateSinkImpl();
+
+        for (int node = 1; node <= cluster.size(); node++)
+        {
+            if (initiatorNodeId == node)
+                continue;
+
+            IInvokableInstance followerNode = cluster.get(node);
+            InetSocketAddress follower = followerNode.broadcastAddress();
+
+            // verify on initiator's stream session
+            initiatorSink.messages(follower, Arrays.asList(PREPARE_SYNACK, 
STREAM, StreamMessage.Type.COMPLETE));
+            initiatorSink.states(follower, Arrays.asList(PREPARING, STREAMING, 
WAIT_COMPLETE, StreamSession.State.COMPLETE));
+
+            // verify on follower's stream session
+            MessageStateSinkImpl followerSink = new MessageStateSinkImpl();
+            followerSink.messages(initiator, Arrays.asList(STREAM_INIT, 
PREPARE_SYN, PREPARE_ACK, RECEIVED));
+            followerSink.states(initiator,  Arrays.asList(PREPARING, 
STREAMING, StreamSession.State.COMPLETE));
+            followerNode.runOnInstance(() -> StreamSession.sink = 
followerSink);
+        }
+
+        cluster.get(initiatorNodeId).runOnInstance(() -> StreamSession.sink = 
initiatorSink);
+    }
+
+    @VisibleForTesting
+    public static class MessageStateSinkImpl implements 
StreamSession.MessageStateSink, Serializable
+    {
+        // use enum ordinal instead of enum to walk around inter-jvm class 
loader issue, only classes defined in
+        // InstanceClassLoader#sharedClassNames are shareable between server 
jvm and test jvm
+        public final Map<InetAddress, Queue<Integer>> messageSink = new 
ConcurrentHashMap<>();
+        public final Map<InetAddress, Queue<Integer>> stateTransitions = new 
ConcurrentHashMap<>();
+
+        public void messages(InetSocketAddress peer, List<StreamMessage.Type> 
messages)
+        {
+            messageSink.put(peer.getAddress(), 
messages.stream().map(Enum::ordinal).collect(Collectors.toCollection(LinkedList::new)));
+        }
+
+        public void states(InetSocketAddress peer, List<StreamSession.State> 
states)
+        {
+            stateTransitions.put(peer.getAddress(), 
states.stream().map(Enum::ordinal).collect(Collectors.toCollection(LinkedList::new)));
+        }
+
+        @Override
+        public void recordState(InetAddressAndPort from, StreamSession.State 
state)
+        {
+            Queue<Integer> states = stateTransitions.get(from.address);
+            if (states.peek() == null)
+                Assert.fail("Unexpected state " + state);
+
+            int expected = states.poll();
+            Assert.assertEquals(StreamSession.State.values()[expected], state);
+        }
+
+        @Override
+        public void recordMessage(InetAddressAndPort from, StreamMessage.Type 
message)
+        {
+            if (message == StreamMessage.Type.KEEP_ALIVE)
+                return;
+
+            Queue<Integer> messages = messageSink.get(from.address);
+            if (messages.peek() == null)
+                Assert.fail("Unexpected message " + message);
+
+            int expected = messages.poll();
+            Assert.assertEquals(StreamMessage.Type.values()[expected], 
message);
+        }
+
+        @Override
+        public void onClose(InetAddressAndPort from)
+        {
+            Queue<Integer> states = stateTransitions.get(from.address);
+            Assert.assertTrue("Missing states: " + states, states.isEmpty());
+
+            Queue<Integer> messages = messageSink.get(from.address);
+            Assert.assertTrue("Missing messages: " + messages, 
messages.isEmpty());
+        }
+    }
 }
diff --git 
a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
 
b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
index 2b642a8..744750e 100644
--- 
a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
+++ 
b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
@@ -213,8 +213,8 @@ public class ZeroCopyStreamingBenchmark
 
         private StreamSession setupStreamingSessionForTest()
         {
-            StreamCoordinator streamCoordinator = new 
StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), 
false, null, PreviewKind.NONE);
-            StreamResultFuture future = 
StreamResultFuture.init(UUID.randomUUID(), StreamOperation.BOOTSTRAP, 
Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+            StreamCoordinator streamCoordinator = new 
StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), 
false, false, null, PreviewKind.NONE);
+            StreamResultFuture future = 
StreamResultFuture.createInitiator(UUID.randomUUID(), 
StreamOperation.BOOTSTRAP, Collections.<StreamEventHandler>emptyList(), 
streamCoordinator);
 
             InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
             streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer, 
Collections.emptyList(), Collections.emptyList(), 
StreamSession.State.INITIALIZED));
diff --git 
a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
 
b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
index c722738..b8115f4 100644
--- 
a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
+++ 
b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
@@ -196,8 +196,8 @@ public class CassandraEntireSSTableStreamWriterTest
 
     private StreamSession setupStreamingSessionForTest()
     {
-        StreamCoordinator streamCoordinator = new 
StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), 
false, null, PreviewKind.NONE);
-        StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), 
StreamOperation.BOOTSTRAP, Collections.<StreamEventHandler>emptyList(), 
streamCoordinator);
+        StreamCoordinator streamCoordinator = new 
StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), 
false, false, null, PreviewKind.NONE);
+        StreamResultFuture future = 
StreamResultFuture.createInitiator(UUID.randomUUID(), 
StreamOperation.BOOTSTRAP, Collections.<StreamEventHandler>emptyList(), 
streamCoordinator);
 
         InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
         streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer, 
Collections.emptyList(), Collections.emptyList(), 
StreamSession.State.INITIALIZED));
diff --git 
a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java 
b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
index eb15e9a..ae3ff92 100644
--- 
a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
+++ 
b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
@@ -99,6 +99,7 @@ public class CassandraStreamManagerTest
             return new StreamSession(StreamOperation.REPAIR,
                                      InetAddressAndPort.getByName("127.0.0.1"),
                                      connectionFactory,
+                                     false,
                                      0,
                                      pendingRepair,
                                      PreviewKind.NONE);
diff --git a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java 
b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
index 61adb58..b18d249 100644
--- a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
+++ b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
@@ -55,7 +55,7 @@ public class StreamStateStoreTest
         Range<Token> range = new Range<>(factory.fromString("0"), 
factory.fromString("100"));
 
         InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
-        StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, 
local, new DefaultConnectionFactory(), 0, null, PreviewKind.NONE);
+        StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, 
local, new DefaultConnectionFactory(), false, 0, null, PreviewKind.NONE);
         session.addStreamRequest("keyspace1", 
RangesAtEndpoint.toDummyList(Collections.singleton(range)), 
RangesAtEndpoint.toDummyList(Collections.emptyList()), 
Collections.singleton("cf"));
 
         StreamStateStore store = new StreamStateStore();
@@ -76,7 +76,7 @@ public class StreamStateStoreTest
 
         // add different range within the same keyspace
         Range<Token> range2 = new Range<>(factory.fromString("100"), 
factory.fromString("200"));
-        session = new StreamSession(StreamOperation.BOOTSTRAP, local, new 
DefaultConnectionFactory(), 0, null, PreviewKind.NONE);
+        session = new StreamSession(StreamOperation.BOOTSTRAP, local, new 
DefaultConnectionFactory(), false, 0, null, PreviewKind.NONE);
         session.addStreamRequest("keyspace1", 
RangesAtEndpoint.toDummyList(Collections.singleton(range2)), 
RangesAtEndpoint.toDummyList(Collections.emptyList()), 
Collections.singleton("cf"));
         session.state(StreamSession.State.COMPLETE);
         store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session));
diff --git 
a/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
 
b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
index a57fcbc..262a200 100644
--- 
a/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
+++ 
b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
@@ -187,13 +187,14 @@ public class EntireSSTableStreamingCorrectFilesCountTest
                                                                     1,
                                                                     new 
DefaultConnectionFactory(),
                                                                     false,
+                                                                    false,
                                                                     null,
                                                                     
PreviewKind.NONE);
 
-        StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(),
-                                                            
StreamOperation.BOOTSTRAP,
-                                                            
Collections.singleton(streamEventHandler),
-                                                            streamCoordinator);
+        StreamResultFuture future = 
StreamResultFuture.createInitiator(UUID.randomUUID(),
+                                                                       
StreamOperation.BOOTSTRAP,
+                                                                       
Collections.singleton(streamEventHandler),
+                                                                       
streamCoordinator);
 
         InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
         streamCoordinator.addSessionInfo(new SessionInfo(peer,
diff --git 
a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java 
b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 2f4feff..0bf7f20 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -77,7 +77,7 @@ public class StreamTransferTaskTest
     public void testScheduleTimeout() throws Exception
     {
         InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
-        StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, 
peer, (template, messagingVersion) -> new EmbeddedChannel(), 0, 
UUID.randomUUID(), PreviewKind.ALL);
+        StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, 
peer, (template, messagingVersion) -> new EmbeddedChannel(), false, 0, 
UUID.randomUUID(), PreviewKind.ALL);
         ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
 
         // create two sstables
@@ -88,6 +88,7 @@ public class StreamTransferTaskTest
         }
 
         // create streaming task that streams those two sstables
+        session.state(StreamSession.State.PREPARING);
         StreamTransferTask task = new StreamTransferTask(session, 
cfs.metadata.id);
         for (SSTableReader sstable : cfs.getLiveSSTables())
         {
@@ -98,6 +99,7 @@ public class StreamTransferTaskTest
         assertEquals(14, task.getTotalNumberOfFiles());
 
         // if file sending completes before timeout then the task should be 
canceled.
+        session.state(StreamSession.State.STREAMING);
         Future f = task.scheduleTimeout(0, 0, TimeUnit.NANOSECONDS);
         f.get();
 
@@ -123,9 +125,9 @@ public class StreamTransferTaskTest
     public void testFailSessionDuringTransferShouldNotReleaseReferences() 
throws Exception
     {
         InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
-        StreamCoordinator streamCoordinator = new 
StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), 
false, null, PreviewKind.NONE);
-        StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), 
StreamOperation.OTHER, Collections.<StreamEventHandler>emptyList(), 
streamCoordinator);
-        StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, 
peer, null, 0, null, PreviewKind.NONE);
+        StreamCoordinator streamCoordinator = new 
StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), 
false, false, null, PreviewKind.NONE);
+        StreamResultFuture future = 
StreamResultFuture.createInitiator(UUID.randomUUID(), StreamOperation.OTHER, 
Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+        StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, 
peer, null, false, 0, null, PreviewKind.NONE);
         session.init(future);
         ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
 
diff --git 
a/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
 
b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
index 957869b..76bfa76 100644
--- 
a/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
+++ 
b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
@@ -63,9 +63,11 @@ public class NettyStreamingMessageSenderTest
         channel = new TestChannel(Integer.MAX_VALUE);
         
channel.attr(NettyStreamingMessageSender.TRANSFERRING_FILE_ATTR).set(Boolean.FALSE);
         UUID pendingRepair = UUID.randomUUID();
-        session = new StreamSession(StreamOperation.BOOTSTRAP, REMOTE_ADDR, 
(template, messagingVersion) -> null, 0, pendingRepair, PreviewKind.ALL);
-        StreamResultFuture future = StreamResultFuture.initReceivingSide(0, 
UUID.randomUUID(), StreamOperation.REPAIR, REMOTE_ADDR, channel, pendingRepair, 
session.getPreviewKind());
+        session = new StreamSession(StreamOperation.BOOTSTRAP, REMOTE_ADDR, 
(template, messagingVersion) -> null, true, 0, pendingRepair, PreviewKind.ALL);
+        StreamResultFuture future = StreamResultFuture.createFollower(0, 
UUID.randomUUID(), StreamOperation.REPAIR, REMOTE_ADDR, channel, pendingRepair, 
session.getPreviewKind());
         session.init(future);
+        session.attachOutbound(channel);
+
         sender = session.getMessageSender();
         sender.setControlMessageChannel(channel);
     }
diff --git 
a/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
 
b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
index 6a2afe8..11f6757 100644
--- 
a/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
+++ 
b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
@@ -127,7 +127,7 @@ public class StreamingInboundHandlerTest
 
     private StreamSession createSession(SessionIdentifier sid)
     {
-        return new StreamSession(StreamOperation.BOOTSTRAP, sid.from, 
(template, messagingVersion) -> null, sid.sessionIndex, UUID.randomUUID(), 
PreviewKind.ALL);
+        return new StreamSession(StreamOperation.BOOTSTRAP, sid.from, 
(template, messagingVersion) -> null, true, sid.sessionIndex, 
UUID.randomUUID(), PreviewKind.ALL);
     }
 
     @Test (expected = IllegalStateException.class)
@@ -152,8 +152,8 @@ public class StreamingInboundHandlerTest
     public void StreamDeserializingTask_deriveSession_IFM_HasSession()
     {
         UUID planId = UUID.randomUUID();
-        StreamResultFuture future = StreamResultFuture.initReceivingSide(0, 
planId, StreamOperation.REPAIR, REMOTE_ADDR, channel, UUID.randomUUID(), 
PreviewKind.ALL);
-        StreamManager.instance.register(future);
+        StreamResultFuture future = StreamResultFuture.createFollower(0, 
planId, StreamOperation.REPAIR, REMOTE_ADDR, channel, UUID.randomUUID(), 
PreviewKind.ALL);
+        StreamManager.instance.registerFollower(future);
         StreamMessageHeader header = new 
StreamMessageHeader(TableId.generate(), REMOTE_ADDR, planId, 0,
                                                              0, 0, 
UUID.randomUUID());
         IncomingStreamMessage msg = new IncomingStreamMessage(null, header);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to