This is an automated email from the ASF dual-hosted git repository. blerer pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 9f6fcc3 Avoid race condition when completing stream sessions 9f6fcc3 is described below commit 9f6fcc340d89eecc000765f6ab93e862f53a02d9 Author: Zhao Yang <zhaoyangsingap...@gmail.com> AuthorDate: Fri Mar 20 15:56:53 2020 +0800 Avoid race condition when completing stream sessions patch by ZhaoYang; reviewed by Sergio Bossa and Benjamin Lerer for CASSANDRA-15666 --- .circleci/config.yml | 174 +++++----- CHANGES.txt | 1 + .../streaming/CassandraCompressedStreamWriter.java | 2 +- .../db/streaming/CassandraStreamWriter.java | 2 +- .../org/apache/cassandra/io/util/ChannelProxy.java | 10 + .../apache/cassandra/net/OutboundConnection.java | 4 +- .../org/apache/cassandra/service/QueryState.java | 2 +- .../cassandra/streaming/StreamCoordinator.java | 15 +- .../apache/cassandra/streaming/StreamManager.java | 28 +- .../org/apache/cassandra/streaming/StreamPlan.java | 4 +- .../cassandra/streaming/StreamResultFuture.java | 31 +- .../apache/cassandra/streaming/StreamSession.java | 352 +++++++++++++++------ .../async/NettyStreamingMessageSender.java | 60 +++- .../streaming/async/StreamingInboundHandler.java | 17 +- .../streaming/messages/CompleteMessage.java | 2 +- .../streaming/messages/IncomingStreamMessage.java | 4 +- .../streaming/messages/KeepAliveMessage.java | 2 +- .../streaming/messages/OutgoingStreamMessage.java | 2 +- .../streaming/messages/PrepareAckMessage.java | 2 +- .../streaming/messages/PrepareSynAckMessage.java | 2 +- .../streaming/messages/PrepareSynMessage.java | 2 +- .../streaming/messages/ReceivedMessage.java | 2 +- .../streaming/messages/SessionFailedMessage.java | 2 +- .../streaming/messages/StreamInitMessage.java | 2 +- .../streaming/messages/StreamMessage.java | 6 +- .../org/apache/cassandra/utils/FBUtilities.java | 75 +++++ .../cassandra/distributed/test/StreamingTest.java | 107 +++++++ .../microbench/ZeroCopyStreamingBenchmark.java | 4 +- .../CassandraEntireSSTableStreamWriterTest.java | 4 +- .../db/streaming/CassandraStreamManagerTest.java | 1 + .../apache/cassandra/dht/StreamStateStoreTest.java | 4 +- ...ntireSSTableStreamingCorrectFilesCountTest.java | 9 +- .../streaming/StreamTransferTaskTest.java | 10 +- .../async/NettyStreamingMessageSenderTest.java | 6 +- .../async/StreamingInboundHandlerTest.java | 6 +- 35 files changed, 673 insertions(+), 283 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 7f6c93b..7231d67 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -3,7 +3,7 @@ jobs: j8_jvm_upgrade_dtests: docker: - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 1 @@ -85,8 +85,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64 @@ -94,7 +94,7 @@ jobs: j8_cqlsh-dtests-py2-with-vnodes: docker: - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -162,8 +162,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64 @@ -171,7 +171,7 @@ jobs: j11_unit_tests: docker: - image: nastra/cassandra-testing-ubuntu1910-java11:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -253,8 +253,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64 @@ -263,7 +263,7 @@ jobs: j8_cqlsh-dtests-py38-no-vnodes: docker: - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -331,8 +331,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64 @@ -340,7 +340,7 @@ jobs: j11_cqlsh-dtests-py3-with-vnodes: docker: - image: nastra/cassandra-testing-ubuntu1910-java11:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -408,8 +408,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64 @@ -418,7 +418,7 @@ jobs: j11_cqlsh-dtests-py3-no-vnodes: docker: - image: nastra/cassandra-testing-ubuntu1910-java11:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -486,8 +486,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64 @@ -496,7 +496,7 @@ jobs: j11_cqlsh-dtests-py38-with-vnodes: docker: - image: nastra/cassandra-testing-ubuntu1910-java11:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -564,8 +564,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64 @@ -574,7 +574,7 @@ jobs: j8_cqlsh-dtests-py3-with-vnodes: docker: - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -642,8 +642,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64 @@ -651,7 +651,7 @@ jobs: j8_cqlsh-dtests-py2-no-vnodes: docker: - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -719,8 +719,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64 @@ -728,7 +728,7 @@ jobs: j11_cqlsh-dtests-py2-with-vnodes: docker: - image: nastra/cassandra-testing-ubuntu1910-java11:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -796,8 +796,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64 @@ -806,7 +806,7 @@ jobs: j11_dtests-with-vnodes: docker: - image: nastra/cassandra-testing-ubuntu1910-java11:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -877,8 +877,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64 @@ -887,7 +887,7 @@ jobs: j8_dtests-no-vnodes: docker: - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -936,8 +936,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64 @@ -945,7 +945,7 @@ jobs: j8_upgradetests-no-vnodes: docker: - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -1035,8 +1035,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64 @@ -1044,7 +1044,7 @@ jobs: utests_stress: docker: - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 1 @@ -1080,8 +1080,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64 @@ -1089,7 +1089,7 @@ jobs: j8_unit_tests: docker: - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -1171,8 +1171,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64 @@ -1180,7 +1180,7 @@ jobs: j11_jvm_dtests: docker: - image: nastra/cassandra-testing-ubuntu1910-java11:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 1 @@ -1262,8 +1262,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64 @@ -1272,7 +1272,7 @@ jobs: j11_build: docker: - image: nastra/cassandra-testing-ubuntu1910-java11:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 1 @@ -1343,8 +1343,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64 @@ -1353,7 +1353,7 @@ jobs: j11_cqlsh-dtests-py2-no-vnodes: docker: - image: nastra/cassandra-testing-ubuntu1910-java11:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -1421,8 +1421,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64 @@ -1431,7 +1431,7 @@ jobs: j8_dtests-with-vnodes: docker: - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -1480,8 +1480,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64 @@ -1489,7 +1489,7 @@ jobs: j11_cqlsh-dtests-py38-no-vnodes: docker: - image: nastra/cassandra-testing-ubuntu1910-java11:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -1557,8 +1557,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64 @@ -1567,7 +1567,7 @@ jobs: j8_jvm_dtests: docker: - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 1 @@ -1649,8 +1649,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64 @@ -1658,7 +1658,7 @@ jobs: j8_build: docker: - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 1 @@ -1729,8 +1729,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64 @@ -1738,7 +1738,7 @@ jobs: j8_cqlsh-dtests-py3-no-vnodes: docker: - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -1806,8 +1806,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64 @@ -1815,7 +1815,7 @@ jobs: j8_cqlsh-dtests-py38-with-vnodes: docker: - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -1883,8 +1883,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64 @@ -1892,7 +1892,7 @@ jobs: utests_long: docker: - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 1 @@ -1928,8 +1928,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64 @@ -1937,7 +1937,7 @@ jobs: utests_fqltool: docker: - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 1 @@ -1973,8 +1973,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64 @@ -1982,7 +1982,7 @@ jobs: j11_dtests-no-vnodes: docker: - image: nastra/cassandra-testing-ubuntu1910-java11:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -2053,8 +2053,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64 @@ -2063,7 +2063,7 @@ jobs: utests_compression: docker: - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -2145,8 +2145,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64 @@ -2154,7 +2154,7 @@ jobs: j8_dtest_jars_build: docker: - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 1 @@ -2213,8 +2213,8 @@ jobs: - CASS_DRIVER_NO_EXTENSIONS: true - CASS_DRIVER_NO_CYTHON: true - CASSANDRA_SKIP_SYNC: true - - DTEST_REPO: git://github.com/apache/cassandra-dtest.git - - DTEST_BRANCH: master + - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git + - DTEST_BRANCH: CASSANDRA-15666 - CCM_MAX_HEAP_SIZE: 1024M - CCM_HEAP_NEWSIZE: 256M - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64 diff --git a/CHANGES.txt b/CHANGES.txt index b9c8f8d..cfc1f4c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0-alpha4 + * Avoid race condition when completing stream sessions (CASSANDRA-15666) * Flush with fast compressors by default (CASSANDRA-15379) * Fix CqlInputFormat regression from the switch to system.size_estimates (CASSANDRA-15637) * Allow sending Entire SSTables over SSL (CASSANDRA-15740) diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java index 21406b2..d92314b 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java @@ -62,7 +62,7 @@ public class CassandraCompressedStreamWriter extends CassandraStreamWriter long totalSize = totalSize(); logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(), sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); - try (ChannelProxy fc = sstable.getDataChannel().sharedCopy()) + try (ChannelProxy fc = sstable.getDataChannel().newChannel()) { long progress = 0L; diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java index ffc663d..8382f0a 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java @@ -83,7 +83,7 @@ public class CassandraStreamWriter sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); AsyncStreamingOutputPlus out = (AsyncStreamingOutputPlus) output; - try(ChannelProxy proxy = sstable.getDataChannel().sharedCopy(); + try(ChannelProxy proxy = sstable.getDataChannel().newChannel(); ChecksumValidator validator = new File(sstable.descriptor.filenameFor(Component.CRC)).exists() ? DataIntegrityMetadata.checksumValidator(sstable.descriptor) : null) diff --git a/src/java/org/apache/cassandra/io/util/ChannelProxy.java b/src/java/org/apache/cassandra/io/util/ChannelProxy.java index 91bb03b..9ff46b7 100644 --- a/src/java/org/apache/cassandra/io/util/ChannelProxy.java +++ b/src/java/org/apache/cassandra/io/util/ChannelProxy.java @@ -111,6 +111,16 @@ public final class ChannelProxy extends SharedCloseableImpl } } + /** + * {@link #sharedCopy()} can not be used if thread will be interruped, as the backing channel will be closed. + * + * @return a new channel instance + */ + public final ChannelProxy newChannel() + { + return new ChannelProxy(filePath); + } + public ChannelProxy sharedCopy() { return new ChannelProxy(this); diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java b/src/java/org/apache/cassandra/net/OutboundConnection.java index 0503259..b84ebc3 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundConnection.java @@ -1561,8 +1561,8 @@ public class OutboundConnection Established established = state.established(); Channel channel = established.channel; OutboundConnectionSettings settings = established.settings; - return SocketFactory.channelId(settings.from, (InetSocketAddress) channel.remoteAddress(), - settings.to, (InetSocketAddress) channel.localAddress(), + return SocketFactory.channelId(settings.from, (InetSocketAddress) channel.localAddress(), + settings.to, (InetSocketAddress) channel.remoteAddress(), type, channel.id().asShortText()); } diff --git a/src/java/org/apache/cassandra/service/QueryState.java b/src/java/org/apache/cassandra/service/QueryState.java index 26f58bf..adb13b5 100644 --- a/src/java/org/apache/cassandra/service/QueryState.java +++ b/src/java/org/apache/cassandra/service/QueryState.java @@ -68,7 +68,7 @@ public class QueryState public long getTimestamp() { if (timestamp == Long.MIN_VALUE) - timestamp = clientState.getTimestamp(); + timestamp = ClientState.getTimestamp(); return timestamp; } diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java index 6d757b6..c8ebabb 100644 --- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java +++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java @@ -42,17 +42,19 @@ public class StreamCoordinator private final Map<InetAddressAndPort, HostStreamingData> peerSessions = new HashMap<>(); private final StreamOperation streamOperation; private final int connectionsPerHost; + private final boolean follower; private StreamConnectionFactory factory; private Iterator<StreamSession> sessionsToConnect = null; private final UUID pendingRepair; private final PreviewKind previewKind; public StreamCoordinator(StreamOperation streamOperation, int connectionsPerHost, StreamConnectionFactory factory, - boolean connectSequentially, UUID pendingRepair, PreviewKind previewKind) + boolean follower, boolean connectSequentially, UUID pendingRepair, PreviewKind previewKind) { this.streamOperation = streamOperation; this.connectionsPerHost = connectionsPerHost; this.factory = factory; + this.follower = follower; this.connectSequentially = connectSequentially; this.pendingRepair = pendingRepair; this.previewKind = previewKind; @@ -86,9 +88,9 @@ public class StreamCoordinator return results; } - public boolean isReceiving() + public boolean isFollower() { - return connectionsPerHost == 0; + return follower; } public void connect(StreamResultFuture future) @@ -272,8 +274,7 @@ public class StreamCoordinator { for (StreamSession session : streamSessions.values()) { - StreamSession.State state = session.state(); - if (state != StreamSession.State.COMPLETE && state != StreamSession.State.FAILED) + if (!session.state().isFinalState()) return true; } return false; @@ -284,7 +285,7 @@ public class StreamCoordinator // create if (streamSessions.size() < connectionsPerHost) { - StreamSession session = new StreamSession(streamOperation, peer, factory, streamSessions.size(), + StreamSession session = new StreamSession(streamOperation, peer, factory, isFollower(), streamSessions.size(), pendingRepair, previewKind); streamSessions.put(++lastReturned, session); return session; @@ -317,7 +318,7 @@ public class StreamCoordinator StreamSession session = streamSessions.get(id); if (session == null) { - session = new StreamSession(streamOperation, peer, factory, id, pendingRepair, previewKind); + session = new StreamSession(streamOperation, peer, factory, isFollower(), id, pendingRepair, previewKind); streamSessions.put(id, session); } return session; diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java index 81c65c5..da77ad2 100644 --- a/src/java/org/apache/cassandra/streaming/StreamManager.java +++ b/src/java/org/apache/cassandra/streaming/StreamManager.java @@ -106,12 +106,12 @@ public class StreamManager implements StreamManagerMBean * We manage them in two different maps to distinguish plan from initiated ones to * receiving ones withing the same JVM. */ - private final Map<UUID, StreamResultFuture> initiatedStreams = new NonBlockingHashMap<>(); - private final Map<UUID, StreamResultFuture> receivingStreams = new NonBlockingHashMap<>(); + private final Map<UUID, StreamResultFuture> initiatorStreams = new NonBlockingHashMap<>(); + private final Map<UUID, StreamResultFuture> followerStreams = new NonBlockingHashMap<>(); public Set<CompositeData> getCurrentStreams() { - return Sets.newHashSet(Iterables.transform(Iterables.concat(initiatedStreams.values(), receivingStreams.values()), new Function<StreamResultFuture, CompositeData>() + return Sets.newHashSet(Iterables.transform(Iterables.concat(initiatorStreams.values(), followerStreams.values()), new Function<StreamResultFuture, CompositeData>() { public CompositeData apply(StreamResultFuture input) { @@ -120,7 +120,7 @@ public class StreamManager implements StreamManagerMBean })); } - public void register(final StreamResultFuture result) + public void registerInitiator(final StreamResultFuture result) { result.addEventListener(notifier); // Make sure we remove the stream on completion (whether successful or not) @@ -128,14 +128,14 @@ public class StreamManager implements StreamManagerMBean { public void run() { - initiatedStreams.remove(result.planId); + initiatorStreams.remove(result.planId); } }, MoreExecutors.directExecutor()); - initiatedStreams.put(result.planId, result); + initiatorStreams.put(result.planId, result); } - public StreamResultFuture registerReceiving(final StreamResultFuture result) + public StreamResultFuture registerFollower(final StreamResultFuture result) { result.addEventListener(notifier); // Make sure we remove the stream on completion (whether successful or not) @@ -143,17 +143,17 @@ public class StreamManager implements StreamManagerMBean { public void run() { - receivingStreams.remove(result.planId); + followerStreams.remove(result.planId); } }, MoreExecutors.directExecutor()); - StreamResultFuture previous = receivingStreams.putIfAbsent(result.planId, result); + StreamResultFuture previous = followerStreams.putIfAbsent(result.planId, result); return previous == null ? result : previous; } public StreamResultFuture getReceivingStream(UUID planId) { - return receivingStreams.get(planId); + return followerStreams.get(planId); } public void addNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback) @@ -178,11 +178,15 @@ public class StreamManager implements StreamManagerMBean public StreamSession findSession(InetAddressAndPort peer, UUID planId, int sessionIndex) { - StreamSession session = findSession(initiatedStreams, peer, planId, sessionIndex); + // Search follower session first, because in some tests, eg. StreamingTransferTest, both initiator session + // and follower session are listening to local host. + // TODO CASSANDRA-15665 it's more robust to add "isFollower" flag into {@link StreamMessageHeader} to distinguish + // initiator session and follower session. + StreamSession session = findSession(followerStreams, peer, planId, sessionIndex); if (session != null) return session; - return findSession(receivingStreams, peer, planId, sessionIndex); + return findSession(initiatorStreams, peer, planId, sessionIndex); } private StreamSession findSession(Map<UUID, StreamResultFuture> streams, InetAddressAndPort peer, UUID planId, int sessionIndex) diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java index 3fcabd0..60845fa 100644 --- a/src/java/org/apache/cassandra/streaming/StreamPlan.java +++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java @@ -64,7 +64,7 @@ public class StreamPlan { this.streamOperation = streamOperation; this.coordinator = new StreamCoordinator(streamOperation, connectionsPerHost, new DefaultConnectionFactory(), - connectSequentially, pendingRepair, previewKind); + false, connectSequentially, pendingRepair, previewKind); } /** @@ -176,7 +176,7 @@ public class StreamPlan */ public StreamResultFuture execute() { - return StreamResultFuture.init(planId, streamOperation, handlers, coordinator); + return StreamResultFuture.createInitiator(planId, streamOperation, handlers, coordinator); } /** diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java index 3268ecf..2b5791f 100644 --- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java +++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java @@ -68,19 +68,19 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> this.coordinator = coordinator; // if there is no session to listen to, we immediately set result for returning - if (!coordinator.isReceiving() && !coordinator.hasActiveSessions()) + if (!coordinator.isFollower() && !coordinator.hasActiveSessions()) set(getCurrentState()); } private StreamResultFuture(UUID planId, StreamOperation streamOperation, UUID pendingRepair, PreviewKind previewKind) { - this(planId, streamOperation, new StreamCoordinator(streamOperation, 0, new DefaultConnectionFactory(), false, pendingRepair, previewKind)); + this(planId, streamOperation, new StreamCoordinator(streamOperation, 0, new DefaultConnectionFactory(), true, false, pendingRepair, previewKind)); } - public static StreamResultFuture init(UUID planId, StreamOperation streamOperation, Collection<StreamEventHandler> listeners, - StreamCoordinator coordinator) + public static StreamResultFuture createInitiator(UUID planId, StreamOperation streamOperation, Collection<StreamEventHandler> listeners, + StreamCoordinator coordinator) { - StreamResultFuture future = createAndRegister(planId, streamOperation, coordinator); + StreamResultFuture future = createAndRegisterInitiator(planId, streamOperation, coordinator); if (listeners != null) { for (StreamEventHandler listener : listeners) @@ -100,13 +100,13 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> return future; } - public static synchronized StreamResultFuture initReceivingSide(int sessionIndex, - UUID planId, - StreamOperation streamOperation, - InetAddressAndPort from, - Channel channel, - UUID pendingRepair, - PreviewKind previewKind) + public static synchronized StreamResultFuture createFollower(int sessionIndex, + UUID planId, + StreamOperation streamOperation, + InetAddressAndPort from, + Channel channel, + UUID pendingRepair, + PreviewKind previewKind) { StreamResultFuture future = StreamManager.instance.getReceivingStream(planId); if (future == null) @@ -117,7 +117,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> // The main reason we create a StreamResultFuture on the receiving side is for JMX exposure. future = new StreamResultFuture(planId, streamOperation, pendingRepair, previewKind); - StreamManager.instance.registerReceiving(future); + StreamManager.instance.registerFollower(future); } future.attachConnection(from, sessionIndex, channel); logger.info("[Stream #{}, ID#{}] Received streaming plan for {} from {} channel.remote {} channel.local {} channel.id {}", @@ -125,10 +125,10 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> return future; } - private static StreamResultFuture createAndRegister(UUID planId, StreamOperation streamOperation, StreamCoordinator coordinator) + private static StreamResultFuture createAndRegisterInitiator(UUID planId, StreamOperation streamOperation, StreamCoordinator coordinator) { StreamResultFuture future = new StreamResultFuture(planId, streamOperation, coordinator); - StreamManager.instance.register(future); + StreamManager.instance.registerInitiator(future); return future; } @@ -141,7 +141,6 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> { StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex); session.init(this); - session.attach(channel); } public void addEventListener(StreamEventHandler listener) diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 05bb5ff..c6ef5f0 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -17,18 +17,18 @@ */ package org.apache.cassandra.streaming; +import java.io.EOFException; import java.net.SocketTimeoutException; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.*; -import com.google.common.util.concurrent.Futures; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.locator.RangesAtEndpoint; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,10 +54,10 @@ import static org.apache.cassandra.net.MessagingService.current_version; /** * Handles the streaming a one or more streams to and from a specific remote node. - * + *<p/> * Both this node and the remote one will create a similar symmetrical {@link StreamSession}. A streaming * session has the following life-cycle: - * + *<pre> * 1. Session Initialization * * (a) A node (the initiator in the following) create a new {@link StreamSession}, @@ -101,12 +101,15 @@ import static org.apache.cassandra.net.MessagingService.current_version; * * 4. Completion phase * - * (a) When a node enters the completion phase, it sends a {@link CompleteMessage} to the peer, and then enter the - * {@link StreamSession.State#WAIT_COMPLETE} state. If it has already received a {@link CompleteMessage} - * from the peer, session is complete and is then closed ({@link #closeSession(State)}). Otherwise, the node - * switch to the {@link StreamSession.State#WAIT_COMPLETE} state and send a {@link CompleteMessage} to the other side. + * (a) When the initiator finishes streaming, it enters the {@link StreamSession.State#WAIT_COMPLETE} state, and waits + * for the follower to send a {@link CompleteMessage} once it finishes streaming too. Once the {@link CompleteMessage} + * is received, initiator sets its own state to {@link StreamSession.State#COMPLETE} and closes all channels attached + * to this session. + * + * </pre> * * In brief, the message passing looks like this (I for initiator, F for follwer): + * <pre> * (session init) * I: StreamInitMessage * (session prepare) @@ -117,7 +120,8 @@ import static org.apache.cassandra.net.MessagingService.current_version; * I: OutgoingStreamMessage * F: ReceivedMessage * (completion) - * I/F: CompleteMessage + * F: CompleteMessage + *</pre> * * All messages which derive from {@link StreamMessage} are sent by the standard internode messaging * (via {@link org.apache.cassandra.net.MessagingService}, while the actual files themselves are sent by a special @@ -127,6 +131,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber { private static final Logger logger = LoggerFactory.getLogger(StreamSession.class); + // for test purpose to record received message and state transition + public volatile static MessageStateSink sink = MessageStateSink.NONE; + private final StreamOperation streamOperation; /** @@ -153,43 +160,77 @@ public class StreamSession implements IEndpointStateChangeSubscriber final Map<String, Set<Range<Token>>> transferredRangesPerKeyspace = new HashMap<>(); + private final boolean isFollower; private final NettyStreamingMessageSender messageSender; - private final ConcurrentMap<ChannelId, Channel> incomingChannels = new ConcurrentHashMap<>(); + // contains both inbound and outbound channels + private final ConcurrentMap<ChannelId, Channel> channels = new ConcurrentHashMap<>(); + + // "maybeCompleted()" should be executed at most once. Because it can be executed asynchronously by IO + // threads(serialization/deserialization) and stream messaging processing thread, causing connection closed before + // receiving peer's CompleteMessage. + private boolean maybeCompleted = false; + private Future closeFuture; - private final AtomicBoolean isAborted = new AtomicBoolean(false); private final UUID pendingRepair; private final PreviewKind previewKind; + /** + * State Transition: + * + * <pre> + * +------------------+----------> FAILED <--------------------+ + * | | ^ | + * | | | initiator | + * INITIALIZED --> PREPARING --> STREAMING ------------> WAIT_COMPLETE ----> COMPLETED + * | | | ^ ^ + * | | | follower | | + * | | +-------------------------)-----------------+ + * | | | | + * | | if preview | | + * | +----------------------------------------+ | + * | nothing to request or to transfer | + * +-----------------------------------------------------------------------------+ + * nothing to request or to transfer + * + * </pre> + */ public enum State { - INITIALIZED, - PREPARING, - STREAMING, - WAIT_COMPLETE, - COMPLETE, - FAILED, + INITIALIZED(false), + PREPARING(false), + STREAMING(false), + WAIT_COMPLETE(false), + COMPLETE(true), + FAILED(true); + + private final boolean finalState; + + State(boolean finalState) + { + this.finalState = finalState; + } + + /** + * @return true if current state is final, either COMPLETE OR FAILED. + */ + public boolean isFinalState() + { + return finalState; + } } private volatile State state = State.INITIALIZED; - private volatile boolean completeSent = false; /** * Create new streaming session with the peer. */ public StreamSession(StreamOperation streamOperation, InetAddressAndPort peer, StreamConnectionFactory factory, - int index, UUID pendingRepair, PreviewKind previewKind) - { - this(streamOperation, new OutboundConnectionSettings(peer), factory, index, pendingRepair, previewKind); - } - /** - * Create new streaming session with the peer. - */ - public StreamSession(StreamOperation streamOperation, OutboundConnectionSettings template, StreamConnectionFactory factory, - int index, UUID pendingRepair, PreviewKind previewKind) + boolean isFollower, int index, UUID pendingRepair, PreviewKind previewKind) { this.streamOperation = streamOperation; - this.peer = template.to; - this.template = template; + this.peer = peer; + this.template = new OutboundConnectionSettings(peer); + this.isFollower = isFollower; this.index = index; this.messageSender = new NettyStreamingMessageSender(this, template, factory, current_version, previewKind.isPreview()); @@ -197,7 +238,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber this.pendingRepair = pendingRepair; this.previewKind = previewKind; - logger.debug("Creating stream session to {}", template); + logger.debug("Creating stream session to {} as {}", template, isFollower ? "follower" : "initiator"); } public UUID planId() @@ -253,11 +294,46 @@ public class StreamSession implements IEndpointStateChangeSubscriber StreamHook.instance.reportStreamFuture(this, streamResult); } - public boolean attach(Channel channel) + /** + * Attach a channel to this session upon receiving the first inbound message. + * + * @param channel The channel to attach. + * @param isControlChannel If the channel is the one to send control messages to. + * @return False if the channel was already attached, true otherwise. + */ + public synchronized boolean attachInbound(Channel channel, boolean isControlChannel) { - if (!messageSender.hasControlChannel()) + failIfFinished(); + + if (!messageSender.hasControlChannel() && isControlChannel) messageSender.injectControlMessageChannel(channel); - return incomingChannels.putIfAbsent(channel.id(), channel) == null; + + channel.closeFuture().addListener(ignored -> onChannelClose(channel)); + return channels.putIfAbsent(channel.id(), channel) == null; + } + + /** + * Attach a channel to this session upon sending the first outbound message. + * + * @param channel The channel to attach. + * @return False if the channel was already attached, true otherwise. + */ + public synchronized boolean attachOutbound(Channel channel) + { + failIfFinished(); + + channel.closeFuture().addListener(ignored -> onChannelClose(channel)); + return channels.putIfAbsent(channel.id(), channel) == null; + } + + /** + * On channel closing, if no channels are left just close the message sender; this must be closed last to ensure + * keep alive messages are sent until the very end of the streaming session. + */ + private synchronized void onChannelClose(Channel channel) + { + if (channels.remove(channel.id()) != null && channels.isEmpty()) + messageSender.close(); } /** @@ -339,7 +415,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber private void failIfFinished() { - if (state() == State.COMPLETE || state() == State.FAILED) + if (state().isFinalState()) throw new RuntimeException(String.format("Stream %s is finished with state %s", planId(), state().name())); } @@ -399,22 +475,32 @@ public class StreamSession implements IEndpointStateChangeSubscriber private synchronized Future closeSession(State finalState) { - Future abortedTasksFuture = null; - if (isAborted.compareAndSet(false, true)) - { - state(finalState); + // it's session is already closed + if (closeFuture != null) + return closeFuture; - // ensure aborting the tasks do not happen on the network IO thread (read: netty event loop) - // as we don't want any blocking disk IO to stop the network thread - if (finalState == State.FAILED) - abortedTasksFuture = ScheduledExecutors.nonPeriodicTasks.submit(this::abortTasks); + state(finalState); - incomingChannels.values().stream().map(channel -> channel.close()); - messageSender.close(); + List<Future> futures = new ArrayList<>(); - streamResult.handleSessionComplete(this); + // ensure aborting the tasks do not happen on the network IO thread (read: netty event loop) + // as we don't want any blocking disk IO to stop the network thread + if (finalState == State.FAILED) + futures.add(ScheduledExecutors.nonPeriodicTasks.submit(this::abortTasks)); + + // Channels should only be closed by the initiator; but, if this session closed + // due to failure, channels should be always closed regardless, even if this is not the initator. + if (!isFollower || state != State.COMPLETE) + { + logger.debug("[Stream #{}] Will close attached channels {}", planId(), channels); + channels.values().forEach(channel -> futures.add(channel.close())); } - return abortedTasksFuture != null ? abortedTasksFuture : Futures.immediateFuture(null); + + sink.onClose(peer); + streamResult.handleSessionComplete(this); + closeFuture = FBUtilities.allOf(futures); + + return closeFuture; } private void abortTasks() @@ -426,7 +512,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber } catch (Exception e) { - logger.warn("failed to abort some streaming tasks", e); + logger.warn("[Stream #{}] failed to abort some streaming tasks", planId(), e); } } @@ -437,6 +523,10 @@ public class StreamSession implements IEndpointStateChangeSubscriber */ public void state(State newState) { + if (logger.isTraceEnabled()) + logger.trace("[Stream #{}] Changing session state from {} to {}", planId(), state, newState); + + sink.recordState(peer, newState); state = newState; } @@ -463,21 +553,29 @@ public class StreamSession implements IEndpointStateChangeSubscriber return state == State.COMPLETE; } - public void messageReceived(StreamMessage message) + public synchronized void messageReceived(StreamMessage message) { + if (message.type != StreamMessage.Type.KEEP_ALIVE) + failIfFinished(); + + sink.recordMessage(peer, message.type); + switch (message.type) { case STREAM_INIT: - // nop + // at follower, nop break; case PREPARE_SYN: + // at follower PrepareSynMessage msg = (PrepareSynMessage) message; prepare(msg.requests, msg.summaries); break; case PREPARE_SYNACK: + // at initiator prepareSynAck((PrepareSynAckMessage) message); break; case PREPARE_ACK: + // at follower prepareAck((PrepareAckMessage) message); break; case STREAM: @@ -488,6 +586,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber received(received.tableId, received.sequenceNumber); break; case COMPLETE: + // at initiator complete(); break; case KEEP_ALIVE: @@ -529,10 +628,32 @@ public class StreamSession implements IEndpointStateChangeSubscriber } /** - * Call back for handling exception during streaming. + * Signal an error to this stream session: if it's an EOF exception, it tries to understand if the socket was closed + * after completion or because the peer was down, otherwise sends a {@link SessionFailedMessage} and closes + * the session as {@link State#FAILED}. */ - public Future onError(Throwable e) + public synchronized Future onError(Throwable e) { + boolean isEofException = e instanceof EOFException; + if (isEofException) + { + if (state.finalState) + { + logger.debug("[Stream #{}] Socket closed after session completed with state {}", planId(), state); + + return null; + } + else + { + logger.error("[Stream #{}] Socket closed before session completion, peer {} is probably down.", + planId(), + peer.address.getHostAddress(), + e); + + return closeSession(State.FAILED); + } + } + logError(e); // send session failure message if (messageSender.connected()) @@ -577,7 +698,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber */ private void prepareAsync(Collection<StreamRequest> requests, Collection<StreamSummary> summaries) { - for (StreamRequest request : requests) addTransferRanges(request.keyspace, RangesAtEndpoint.concat(request.full, request.transientReplicas), request.columnFamilies, true); // always flush on stream request for (StreamSummary summary : summaries) @@ -589,9 +709,12 @@ public class StreamSession implements IEndpointStateChangeSubscriber prepareSynAck.summaries.add(task.getSummary()); messageSender.sendMessage(prepareSynAck); - streamResult.handleSessionPrepared(this); - maybeCompleted(); + + if (isPreview()) + completePreview(); + else + maybeCompleted(); } private void prepareSynAck(PrepareSynAckMessage msg) @@ -602,7 +725,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber prepareReceiving(summary); // only send the (final) ACK if we are expecting the peer to send this node (the initiator) some files - messageSender.sendMessage(new PrepareAckMessage()); + if (!isPreview()) + messageSender.sendMessage(new PrepareAckMessage()); } if (isPreview()) @@ -614,9 +738,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber private void prepareAck(PrepareAckMessage msg) { if (isPreview()) - completePreview(); - else - startStreamingFiles(true); + throw new RuntimeException(String.format("[Stream #%s] Cannot receive PrepareAckMessage for preview session", planId())); + startStreamingFiles(true); } /** @@ -646,7 +769,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber { if (isPreview()) { - throw new RuntimeException("Cannot receive files for preview session"); + throw new RuntimeException(String.format("[Stream #%s] Cannot receive files for preview session", planId())); } long headerSize = message.stream.getSize(); @@ -674,20 +797,49 @@ public class StreamSession implements IEndpointStateChangeSubscriber */ public synchronized void complete() { - logger.debug("handling Complete message, state = {}, completeSent = {}", state, completeSent); - if (state == State.WAIT_COMPLETE) + logger.debug("[Stream #{}] handling Complete message, state = {}", planId(), state); + + if (!isFollower) { - if (!completeSent) - { - messageSender.sendMessage(new CompleteMessage()); - completeSent = true; - } - closeSession(State.COMPLETE); + if (state == State.WAIT_COMPLETE) + closeSession(State.COMPLETE); + else + state(State.WAIT_COMPLETE); } else { - state(State.WAIT_COMPLETE); + // pre-4.0 nodes should not be connected via streaming, see {@link MessagingService#accept_streaming} + throw new IllegalStateException(String.format("[Stream #%s] Complete message can be only received by the initiator!", planId())); + } + } + + /** + * Synchronize both {@link #complete()} and {@link #maybeCompleted()} to avoid racing + */ + private synchronized boolean maybeCompleted() + { + if (!(receivers.isEmpty() && transfers.isEmpty())) + return false; + + // if already executed once, skip it + if (maybeCompleted) + return true; + + maybeCompleted = true; + if (!isFollower) + { + if (state == State.WAIT_COMPLETE) + closeSession(State.COMPLETE); + else + state(State.WAIT_COMPLETE); + } + else + { + messageSender.sendMessage(new CompleteMessage()); + closeSession(State.COMPLETE); } + + return true; } /** @@ -760,31 +912,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber } } - private boolean maybeCompleted() - { - boolean completed = receivers.isEmpty() && transfers.isEmpty(); - if (completed) - { - if (state == State.WAIT_COMPLETE) - { - if (!completeSent) - { - messageSender.sendMessage(new CompleteMessage()); - completeSent = true; - } - closeSession(State.COMPLETE); - } - else - { - // notify peer that this session is completed - messageSender.sendMessage(new CompleteMessage()); - completeSent = true; - state(State.WAIT_COMPLETE); - } - } - return completed; - } - /** * Flushes matching column families from the given keyspace, or all columnFamilies * if the cf list is empty. @@ -843,4 +970,43 @@ public class StreamSession implements IEndpointStateChangeSubscriber { return transferredRangesPerKeyspace.size(); } + + @VisibleForTesting + public static interface MessageStateSink + { + static final MessageStateSink NONE = new MessageStateSink() { + @Override + public void recordState(InetAddressAndPort from, State state) + { + } + + @Override + public void recordMessage(InetAddressAndPort from, StreamMessage.Type message) + { + } + + @Override + public void onClose(InetAddressAndPort from) + { + } + }; + + /** + * @param from peer that is connected in the stream session + * @param state new state to change to + */ + public void recordState(InetAddressAndPort from, StreamSession.State state); + + /** + * @param from peer that sends the given message + * @param message stream message sent by peer + */ + public void recordMessage(InetAddressAndPort from, StreamMessage.Type message); + + /** + * + * @param from peer that is being disconnected + */ + public void onClose(InetAddressAndPort from); + } } diff --git a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java index 1314e1d..fba56f5 100644 --- a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java +++ b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java @@ -21,9 +21,8 @@ package org.apache.cassandra.streaming.async; import java.io.IOError; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; +import java.nio.channels.ClosedByInterruptException; import java.util.Collection; -import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; @@ -33,6 +32,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +61,7 @@ import org.apache.cassandra.streaming.messages.OutgoingStreamMessage; import org.apache.cassandra.streaming.messages.StreamInitMessage; import org.apache.cassandra.streaming.messages.StreamMessage; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.JVMStabilityInspector; /** * Responsible for sending {@link StreamMessage}s to a given peer. We manage an array of netty {@link Channel}s @@ -102,7 +103,7 @@ public class NettyStreamingMessageSender implements StreamingMessageSender * A special {@link Channel} for sending non-stream streaming messages, basically anything that isn't an * {@link OutgoingStreamMessage} (or an {@link IncomingStreamMessage}, but a node doesn't send that, it's only received). */ - private Channel controlMessageChannel; + private volatile Channel controlMessageChannel; // note: this really doesn't need to be a LBQ, just something that's thread safe private final Collection<ScheduledFuture<?>> channelKeepAlives = new LinkedBlockingQueue<>(); @@ -153,6 +154,9 @@ public class NettyStreamingMessageSender implements StreamingMessageSender return controlMessageChannel != null; } + /** + * Used by follower to setup control message channel created by initiator + */ public void injectControlMessageChannel(Channel channel) { this.controlMessageChannel = channel; @@ -160,11 +164,20 @@ public class NettyStreamingMessageSender implements StreamingMessageSender scheduleKeepAliveTask(channel); } + /** + * Used by initiator to setup control message channel connecting to follower + */ private void setupControlMessageChannel() throws IOException { if (controlMessageChannel == null) { - controlMessageChannel = createChannel(); + /* + * Inbound handlers are needed: + * a) for initiator's control channel(the first outbound channel) to receive follower's message. + * b) for streaming receiver (note: both initiator and follower can receive streaming files) to reveive files, + * in {@link Handler#setupStreamingPipeline} + */ + controlMessageChannel = createChannel(true); scheduleKeepAliveTask(controlMessageChannel); } } @@ -181,11 +194,16 @@ public class NettyStreamingMessageSender implements StreamingMessageSender task.future = scheduledFuture; } - private Channel createChannel() throws IOException + private Channel createChannel(boolean isInboundHandlerNeeded) throws IOException { Channel channel = factory.createConnection(template, streamingVersion); - ChannelPipeline pipeline = channel.pipeline(); - pipeline.addLast("stream", new StreamingInboundHandler(template.to, streamingVersion, session)); + session.attachOutbound(channel); + + if (isInboundHandlerNeeded) + { + ChannelPipeline pipeline = channel.pipeline(); + pipeline.addLast("stream", new StreamingInboundHandler(template.to, streamingVersion, session)); + } channel.attr(TRANSFERRING_FILE_ATTR).set(Boolean.FALSE); logger.debug("Creating channel id {} local {} remote {}", channel.id(), channel.localAddress(), channel.remoteAddress()); return channel; @@ -316,9 +334,10 @@ public class NettyStreamingMessageSender implements StreamingMessageSender if (!acquirePermit(SEMAPHORE_UNAVAILABLE_LOG_INTERVAL)) return; + Channel channel = null; try { - Channel channel = getOrCreateChannel(); + channel = getOrCreateChannel(); if (!channel.attr(TRANSFERRING_FILE_ATTR).compareAndSet(false, true)) throw new IllegalStateException("channel's transferring state is currently set to true. refusing to start new stream"); @@ -336,6 +355,19 @@ public class NettyStreamingMessageSender implements StreamingMessageSender { session.onError(e); } + catch (Throwable t) + { + if (closed && Throwables.getRootCause(t) instanceof ClosedByInterruptException && fileTransferExecutor.isShutdown()) + { + logger.debug("{} Streaming channel was closed due to the executor pool being shutdown", createLogTag(session, channel)); + } + else + { + JVMStabilityInspector.inspectThrowable(t); + if (!session.state().isFinalState()) + session.onError(t); + } + } finally { fileTransferSemaphore.release(); @@ -383,7 +415,7 @@ public class NettyStreamingMessageSender implements StreamingMessageSender if (channel != null) return channel; - channel = createChannel(); + channel = createChannel(false); threadToChannelMap.put(currentThread, channel); return channel; } @@ -513,6 +545,9 @@ public class NettyStreamingMessageSender implements StreamingMessageSender @Override public void close() { + if (closed) + return; + closed = true; if (logger.isDebugEnabled()) logger.debug("{} Closing stream connection channels on {}", createLogTag(session, null), template.to); @@ -520,14 +555,7 @@ public class NettyStreamingMessageSender implements StreamingMessageSender future.cancel(false); channelKeepAlives.clear(); - List<Future<Void>> futures = new ArrayList<>(threadToChannelMap.size()); - for (Channel channel : threadToChannelMap.values()) - futures.add(channel.close()); - FBUtilities.waitOnFutures(futures, 10, TimeUnit.SECONDS); threadToChannelMap.clear(); fileTransferExecutor.shutdownNow(); - - if (controlMessageChannel != null) - controlMessageChannel.close(); } } diff --git a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java index a319fea..edc74e3 100644 --- a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java +++ b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java @@ -18,12 +18,9 @@ package org.apache.cassandra.streaming.async; -import java.io.EOFException; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -31,7 +28,6 @@ import java.util.function.Function; import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.Uninterruptibles; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +40,6 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.FastThreadLocalThread; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.AsyncStreamingInputPlus; -import org.apache.cassandra.net.AsyncStreamingInputPlus.InputTimeoutException; import org.apache.cassandra.streaming.StreamManager; import org.apache.cassandra.streaming.StreamReceiveException; import org.apache.cassandra.streaming.StreamResultFuture; @@ -183,7 +178,7 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter Uninterruptibles.sleepUninterruptibly(400, TimeUnit.MILLISECONDS); } - StreamMessage message = StreamMessage.deserialize(buffers, protocolVersion, null); + StreamMessage message = StreamMessage.deserialize(buffers, protocolVersion); // keep-alives don't necessarily need to be tied to a session (they could be arrive before or after // wrt session lifecycle, due to races), just log that we received the message and carry on @@ -203,10 +198,6 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter session.messageReceived(message); } } - catch (InputTimeoutException | EOFException e) - { - // ignore - } catch (Throwable t) { JVMStabilityInspector.inspectThrowable(t); @@ -248,7 +239,7 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter { assert session == null : "initiator of stream session received a StreamInitMessage"; StreamInitMessage init = (StreamInitMessage) message; - StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.streamOperation, init.from, channel, init.pendingRepair, init.previewKind); + StreamResultFuture.createFollower(init.sessionIndex, init.planId, init.streamOperation, init.from, channel, init.pendingRepair, init.previewKind); streamSession = sessionProvider.apply(new SessionIdentifier(init.from, init.planId, init.sessionIndex)); } else if (message instanceof IncomingStreamMessage) @@ -262,7 +253,9 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter if (streamSession == null) throw new IllegalStateException(createLogTag(null, channel) + " no session found for message " + message); - streamSession.attach(channel); + // Attach this channel to the session: this only happens upon receiving the first init message as a follower; + // in all other cases, no new control channel will be added, as the proper control channel will be already attached. + streamSession.attachInbound(channel, message instanceof StreamInitMessage); return streamSession; } } diff --git a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java index 81e16f7..83d95e0 100644 --- a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java @@ -25,7 +25,7 @@ public class CompleteMessage extends StreamMessage { public static Serializer<CompleteMessage> serializer = new Serializer<CompleteMessage>() { - public CompleteMessage deserialize(DataInputPlus in, int version, StreamSession session) + public CompleteMessage deserialize(DataInputPlus in, int version) { return new CompleteMessage(); } diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java index e17c3ab..5f69a90 100644 --- a/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java @@ -35,10 +35,10 @@ public class IncomingStreamMessage extends StreamMessage public static Serializer<IncomingStreamMessage> serializer = new Serializer<IncomingStreamMessage>() { @SuppressWarnings("resource") - public IncomingStreamMessage deserialize(DataInputPlus input, int version, StreamSession session) throws IOException + public IncomingStreamMessage deserialize(DataInputPlus input, int version) throws IOException { StreamMessageHeader header = StreamMessageHeader.serializer.deserialize(input, version); - session = StreamManager.instance.findSession(header.sender, header.planId, header.sessionIndex); + StreamSession session = StreamManager.instance.findSession(header.sender, header.planId, header.sessionIndex); if (session == null) throw new IllegalStateException(String.format("unknown stream session: %s - %d", header.planId, header.sessionIndex)); ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(header.tableId); diff --git a/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java b/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java index f80c617..5352b3b 100644 --- a/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java @@ -28,7 +28,7 @@ public class KeepAliveMessage extends StreamMessage { public static Serializer<KeepAliveMessage> serializer = new Serializer<KeepAliveMessage>() { - public KeepAliveMessage deserialize(DataInputPlus in, int version, StreamSession session) throws IOException + public KeepAliveMessage deserialize(DataInputPlus in, int version) throws IOException { return new KeepAliveMessage(); } diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java index 263aabd..8406f80 100644 --- a/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java @@ -32,7 +32,7 @@ public class OutgoingStreamMessage extends StreamMessage { public static Serializer<OutgoingStreamMessage> serializer = new Serializer<OutgoingStreamMessage>() { - public OutgoingStreamMessage deserialize(DataInputPlus in, int version, StreamSession session) + public OutgoingStreamMessage deserialize(DataInputPlus in, int version) { throw new UnsupportedOperationException("Not allowed to call deserialize on an outgoing stream"); } diff --git a/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java b/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java index f43ff01..97fdff7 100644 --- a/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java @@ -33,7 +33,7 @@ public class PrepareAckMessage extends StreamMessage //nop } - public PrepareAckMessage deserialize(DataInputPlus in, int version, StreamSession session) throws IOException + public PrepareAckMessage deserialize(DataInputPlus in, int version) throws IOException { return new PrepareAckMessage(); } diff --git a/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java b/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java index 2d8026c..4e5e8fb 100644 --- a/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java @@ -38,7 +38,7 @@ public class PrepareSynAckMessage extends StreamMessage StreamSummary.serializer.serialize(summary, out, version); } - public PrepareSynAckMessage deserialize(DataInputPlus input, int version, StreamSession session) throws IOException + public PrepareSynAckMessage deserialize(DataInputPlus input, int version) throws IOException { PrepareSynAckMessage message = new PrepareSynAckMessage(); int numSummaries = input.readInt(); diff --git a/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java b/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java index 6fbaafa..e378af7 100644 --- a/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java @@ -31,7 +31,7 @@ public class PrepareSynMessage extends StreamMessage { public static Serializer<PrepareSynMessage> serializer = new Serializer<PrepareSynMessage>() { - public PrepareSynMessage deserialize(DataInputPlus input, int version, StreamSession session) throws IOException + public PrepareSynMessage deserialize(DataInputPlus input, int version) throws IOException { PrepareSynMessage message = new PrepareSynMessage(); // requests diff --git a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java index 3988dcc..ff2cdec 100644 --- a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java @@ -29,7 +29,7 @@ public class ReceivedMessage extends StreamMessage public static Serializer<ReceivedMessage> serializer = new Serializer<ReceivedMessage>() { @SuppressWarnings("resource") // Not closing constructed DataInputPlus's as the channel needs to remain open. - public ReceivedMessage deserialize(DataInputPlus input, int version, StreamSession session) throws IOException + public ReceivedMessage deserialize(DataInputPlus input, int version) throws IOException { return new ReceivedMessage(TableId.deserialize(input), input.readInt()); } diff --git a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java index 59ad90e..ca10bcc 100644 --- a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java @@ -25,7 +25,7 @@ public class SessionFailedMessage extends StreamMessage { public static Serializer<SessionFailedMessage> serializer = new Serializer<SessionFailedMessage>() { - public SessionFailedMessage deserialize(DataInputPlus in, int version, StreamSession session) + public SessionFailedMessage deserialize(DataInputPlus in, int version) { return new SessionFailedMessage(); } diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java index e148790..953f2c4 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java @@ -86,7 +86,7 @@ public class StreamInitMessage extends StreamMessage out.writeInt(message.previewKind.getSerializationVal()); } - public StreamInitMessage deserialize(DataInputPlus in, int version, StreamSession session) throws IOException + public StreamInitMessage deserialize(DataInputPlus in, int version) throws IOException { InetAddressAndPort from = inetAddressAndPortSerializer.deserialize(in, version); int sessionIndex = in.readInt(); diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java index 72e180c..2f42f1b 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java @@ -43,16 +43,16 @@ public abstract class StreamMessage return 1 + message.type.outSerializer.serializedSize(message, version); } - public static StreamMessage deserialize(DataInputPlus in, int version, StreamSession session) throws IOException + public static StreamMessage deserialize(DataInputPlus in, int version) throws IOException { Type type = Type.lookupById(in.readByte()); - return type.inSerializer.deserialize(in, version, session); + return type.inSerializer.deserialize(in, version); } /** StreamMessage serializer */ public static interface Serializer<V extends StreamMessage> { - V deserialize(DataInputPlus in, int version, StreamSession session) throws IOException; + V deserialize(DataInputPlus in, int version) throws IOException; void serialize(V message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException; long serializedSize(V message, int version) throws IOException; } diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index 1df84ab..115cd43 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -486,6 +486,81 @@ public class FBUtilities Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS); } } + + /** + * Returns a new {@link Future} wrapping the given list of futures and returning a list of their results. + */ + public static Future<List> allOf(Collection<Future> futures) + { + if (futures.isEmpty()) + return CompletableFuture.completedFuture(null); + + return new Future<List>() + { + @Override + @SuppressWarnings("unchecked") + public List get() throws InterruptedException, ExecutionException + { + List result = new ArrayList<>(futures.size()); + for (Future current : futures) + { + result.add(current.get()); + } + return result; + } + + @Override + @SuppressWarnings("unchecked") + public List get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException + { + List result = new ArrayList<>(futures.size()); + long deadline = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, unit); + for (Future current : futures) + { + long remaining = deadline - System.nanoTime(); + if (remaining <= 0) + throw new TimeoutException(); + + result.add(current.get(remaining, TimeUnit.NANOSECONDS)); + } + return result; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) + { + for (Future current : futures) + { + if (!current.cancel(mayInterruptIfRunning)) + return false; + } + return true; + } + + @Override + public boolean isCancelled() + { + for (Future current : futures) + { + if (!current.isCancelled()) + return false; + } + return true; + } + + @Override + public boolean isDone() + { + for (Future current : futures) + { + if (!current.isDone()) + return false; + } + return true; + } + }; + } + /** * Create a new instance of a partitioner defined in an SSTable Descriptor * @param desc Descriptor of an sstable diff --git a/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java b/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java index bafd03d..956f21e 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java @@ -18,16 +18,39 @@ package org.apache.cassandra.distributed.test; +import java.io.Serializable; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import com.google.common.annotations.VisibleForTesting; import org.junit.Assert; import org.junit.Test; import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessage; import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.apache.cassandra.streaming.StreamSession.State.PREPARING; +import static org.apache.cassandra.streaming.StreamSession.State.STREAMING; +import static org.apache.cassandra.streaming.StreamSession.State.WAIT_COMPLETE; +import static org.apache.cassandra.streaming.messages.StreamMessage.Type.PREPARE_ACK; +import static org.apache.cassandra.streaming.messages.StreamMessage.Type.PREPARE_SYN; +import static org.apache.cassandra.streaming.messages.StreamMessage.Type.PREPARE_SYNACK; +import static org.apache.cassandra.streaming.messages.StreamMessage.Type.RECEIVED; +import static org.apache.cassandra.streaming.messages.StreamMessage.Type.STREAM; +import static org.apache.cassandra.streaming.messages.StreamMessage.Type.STREAM_INIT; public class StreamingTest extends TestBaseImpl { @@ -51,6 +74,9 @@ public class StreamingTest extends TestBaseImpl Assert.assertEquals(0, results.length); } + // collect message and state + registerSink(cluster, nodes); + cluster.get(nodes).runOnInstance(() -> StorageService.instance.rebuild(null, KEYSPACE, null, null)); { Object[][] results = cluster.get(nodes).executeInternal(String.format("SELECT k, c1, c2 FROM %s.cf;", KEYSPACE)); @@ -72,4 +98,85 @@ public class StreamingTest extends TestBaseImpl testStreaming(2, 2, 1000, "LeveledCompactionStrategy"); } + public static void registerSink(Cluster cluster, int initiatorNodeId) + { + IInvokableInstance initiatorNode = cluster.get(initiatorNodeId); + InetSocketAddress initiator = initiatorNode.broadcastAddress(); + MessageStateSinkImpl initiatorSink = new MessageStateSinkImpl(); + + for (int node = 1; node <= cluster.size(); node++) + { + if (initiatorNodeId == node) + continue; + + IInvokableInstance followerNode = cluster.get(node); + InetSocketAddress follower = followerNode.broadcastAddress(); + + // verify on initiator's stream session + initiatorSink.messages(follower, Arrays.asList(PREPARE_SYNACK, STREAM, StreamMessage.Type.COMPLETE)); + initiatorSink.states(follower, Arrays.asList(PREPARING, STREAMING, WAIT_COMPLETE, StreamSession.State.COMPLETE)); + + // verify on follower's stream session + MessageStateSinkImpl followerSink = new MessageStateSinkImpl(); + followerSink.messages(initiator, Arrays.asList(STREAM_INIT, PREPARE_SYN, PREPARE_ACK, RECEIVED)); + followerSink.states(initiator, Arrays.asList(PREPARING, STREAMING, StreamSession.State.COMPLETE)); + followerNode.runOnInstance(() -> StreamSession.sink = followerSink); + } + + cluster.get(initiatorNodeId).runOnInstance(() -> StreamSession.sink = initiatorSink); + } + + @VisibleForTesting + public static class MessageStateSinkImpl implements StreamSession.MessageStateSink, Serializable + { + // use enum ordinal instead of enum to walk around inter-jvm class loader issue, only classes defined in + // InstanceClassLoader#sharedClassNames are shareable between server jvm and test jvm + public final Map<InetAddress, Queue<Integer>> messageSink = new ConcurrentHashMap<>(); + public final Map<InetAddress, Queue<Integer>> stateTransitions = new ConcurrentHashMap<>(); + + public void messages(InetSocketAddress peer, List<StreamMessage.Type> messages) + { + messageSink.put(peer.getAddress(), messages.stream().map(Enum::ordinal).collect(Collectors.toCollection(LinkedList::new))); + } + + public void states(InetSocketAddress peer, List<StreamSession.State> states) + { + stateTransitions.put(peer.getAddress(), states.stream().map(Enum::ordinal).collect(Collectors.toCollection(LinkedList::new))); + } + + @Override + public void recordState(InetAddressAndPort from, StreamSession.State state) + { + Queue<Integer> states = stateTransitions.get(from.address); + if (states.peek() == null) + Assert.fail("Unexpected state " + state); + + int expected = states.poll(); + Assert.assertEquals(StreamSession.State.values()[expected], state); + } + + @Override + public void recordMessage(InetAddressAndPort from, StreamMessage.Type message) + { + if (message == StreamMessage.Type.KEEP_ALIVE) + return; + + Queue<Integer> messages = messageSink.get(from.address); + if (messages.peek() == null) + Assert.fail("Unexpected message " + message); + + int expected = messages.poll(); + Assert.assertEquals(StreamMessage.Type.values()[expected], message); + } + + @Override + public void onClose(InetAddressAndPort from) + { + Queue<Integer> states = stateTransitions.get(from.address); + Assert.assertTrue("Missing states: " + states, states.isEmpty()); + + Queue<Integer> messages = messageSink.get(from.address); + Assert.assertTrue("Missing messages: " + messages, messages.isEmpty()); + } + } } diff --git a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java index 2b642a8..744750e 100644 --- a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java +++ b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java @@ -213,8 +213,8 @@ public class ZeroCopyStreamingBenchmark private StreamSession setupStreamingSessionForTest() { - StreamCoordinator streamCoordinator = new StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), false, null, PreviewKind.NONE); - StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), StreamOperation.BOOTSTRAP, Collections.<StreamEventHandler>emptyList(), streamCoordinator); + StreamCoordinator streamCoordinator = new StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), false, false, null, PreviewKind.NONE); + StreamResultFuture future = StreamResultFuture.createInitiator(UUID.randomUUID(), StreamOperation.BOOTSTRAP, Collections.<StreamEventHandler>emptyList(), streamCoordinator); InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort(); streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer, Collections.emptyList(), Collections.emptyList(), StreamSession.State.INITIALIZED)); diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java index c722738..b8115f4 100644 --- a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java +++ b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java @@ -196,8 +196,8 @@ public class CassandraEntireSSTableStreamWriterTest private StreamSession setupStreamingSessionForTest() { - StreamCoordinator streamCoordinator = new StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), false, null, PreviewKind.NONE); - StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), StreamOperation.BOOTSTRAP, Collections.<StreamEventHandler>emptyList(), streamCoordinator); + StreamCoordinator streamCoordinator = new StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), false, false, null, PreviewKind.NONE); + StreamResultFuture future = StreamResultFuture.createInitiator(UUID.randomUUID(), StreamOperation.BOOTSTRAP, Collections.<StreamEventHandler>emptyList(), streamCoordinator); InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort(); streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer, Collections.emptyList(), Collections.emptyList(), StreamSession.State.INITIALIZED)); diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java index eb15e9a..ae3ff92 100644 --- a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java +++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java @@ -99,6 +99,7 @@ public class CassandraStreamManagerTest return new StreamSession(StreamOperation.REPAIR, InetAddressAndPort.getByName("127.0.0.1"), connectionFactory, + false, 0, pendingRepair, PreviewKind.NONE); diff --git a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java index 61adb58..b18d249 100644 --- a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java +++ b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java @@ -55,7 +55,7 @@ public class StreamStateStoreTest Range<Token> range = new Range<>(factory.fromString("0"), factory.fromString("100")); InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); - StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, local, new DefaultConnectionFactory(), 0, null, PreviewKind.NONE); + StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, local, new DefaultConnectionFactory(), false, 0, null, PreviewKind.NONE); session.addStreamRequest("keyspace1", RangesAtEndpoint.toDummyList(Collections.singleton(range)), RangesAtEndpoint.toDummyList(Collections.emptyList()), Collections.singleton("cf")); StreamStateStore store = new StreamStateStore(); @@ -76,7 +76,7 @@ public class StreamStateStoreTest // add different range within the same keyspace Range<Token> range2 = new Range<>(factory.fromString("100"), factory.fromString("200")); - session = new StreamSession(StreamOperation.BOOTSTRAP, local, new DefaultConnectionFactory(), 0, null, PreviewKind.NONE); + session = new StreamSession(StreamOperation.BOOTSTRAP, local, new DefaultConnectionFactory(), false, 0, null, PreviewKind.NONE); session.addStreamRequest("keyspace1", RangesAtEndpoint.toDummyList(Collections.singleton(range2)), RangesAtEndpoint.toDummyList(Collections.emptyList()), Collections.singleton("cf")); session.state(StreamSession.State.COMPLETE); store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session)); diff --git a/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java index a57fcbc..262a200 100644 --- a/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java +++ b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java @@ -187,13 +187,14 @@ public class EntireSSTableStreamingCorrectFilesCountTest 1, new DefaultConnectionFactory(), false, + false, null, PreviewKind.NONE); - StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), - StreamOperation.BOOTSTRAP, - Collections.singleton(streamEventHandler), - streamCoordinator); + StreamResultFuture future = StreamResultFuture.createInitiator(UUID.randomUUID(), + StreamOperation.BOOTSTRAP, + Collections.singleton(streamEventHandler), + streamCoordinator); InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort(); streamCoordinator.addSessionInfo(new SessionInfo(peer, diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java index 2f4feff..0bf7f20 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java @@ -77,7 +77,7 @@ public class StreamTransferTaskTest public void testScheduleTimeout() throws Exception { InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort(); - StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, peer, (template, messagingVersion) -> new EmbeddedChannel(), 0, UUID.randomUUID(), PreviewKind.ALL); + StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, peer, (template, messagingVersion) -> new EmbeddedChannel(), false, 0, UUID.randomUUID(), PreviewKind.ALL); ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD); // create two sstables @@ -88,6 +88,7 @@ public class StreamTransferTaskTest } // create streaming task that streams those two sstables + session.state(StreamSession.State.PREPARING); StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.id); for (SSTableReader sstable : cfs.getLiveSSTables()) { @@ -98,6 +99,7 @@ public class StreamTransferTaskTest assertEquals(14, task.getTotalNumberOfFiles()); // if file sending completes before timeout then the task should be canceled. + session.state(StreamSession.State.STREAMING); Future f = task.scheduleTimeout(0, 0, TimeUnit.NANOSECONDS); f.get(); @@ -123,9 +125,9 @@ public class StreamTransferTaskTest public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception { InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort(); - StreamCoordinator streamCoordinator = new StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), false, null, PreviewKind.NONE); - StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), StreamOperation.OTHER, Collections.<StreamEventHandler>emptyList(), streamCoordinator); - StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, peer, null, 0, null, PreviewKind.NONE); + StreamCoordinator streamCoordinator = new StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), false, false, null, PreviewKind.NONE); + StreamResultFuture future = StreamResultFuture.createInitiator(UUID.randomUUID(), StreamOperation.OTHER, Collections.<StreamEventHandler>emptyList(), streamCoordinator); + StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, peer, null, false, 0, null, PreviewKind.NONE); session.init(future); ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD); diff --git a/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java index 957869b..76bfa76 100644 --- a/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java +++ b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java @@ -63,9 +63,11 @@ public class NettyStreamingMessageSenderTest channel = new TestChannel(Integer.MAX_VALUE); channel.attr(NettyStreamingMessageSender.TRANSFERRING_FILE_ATTR).set(Boolean.FALSE); UUID pendingRepair = UUID.randomUUID(); - session = new StreamSession(StreamOperation.BOOTSTRAP, REMOTE_ADDR, (template, messagingVersion) -> null, 0, pendingRepair, PreviewKind.ALL); - StreamResultFuture future = StreamResultFuture.initReceivingSide(0, UUID.randomUUID(), StreamOperation.REPAIR, REMOTE_ADDR, channel, pendingRepair, session.getPreviewKind()); + session = new StreamSession(StreamOperation.BOOTSTRAP, REMOTE_ADDR, (template, messagingVersion) -> null, true, 0, pendingRepair, PreviewKind.ALL); + StreamResultFuture future = StreamResultFuture.createFollower(0, UUID.randomUUID(), StreamOperation.REPAIR, REMOTE_ADDR, channel, pendingRepair, session.getPreviewKind()); session.init(future); + session.attachOutbound(channel); + sender = session.getMessageSender(); sender.setControlMessageChannel(channel); } diff --git a/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java index 6a2afe8..11f6757 100644 --- a/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java +++ b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java @@ -127,7 +127,7 @@ public class StreamingInboundHandlerTest private StreamSession createSession(SessionIdentifier sid) { - return new StreamSession(StreamOperation.BOOTSTRAP, sid.from, (template, messagingVersion) -> null, sid.sessionIndex, UUID.randomUUID(), PreviewKind.ALL); + return new StreamSession(StreamOperation.BOOTSTRAP, sid.from, (template, messagingVersion) -> null, true, sid.sessionIndex, UUID.randomUUID(), PreviewKind.ALL); } @Test (expected = IllegalStateException.class) @@ -152,8 +152,8 @@ public class StreamingInboundHandlerTest public void StreamDeserializingTask_deriveSession_IFM_HasSession() { UUID planId = UUID.randomUUID(); - StreamResultFuture future = StreamResultFuture.initReceivingSide(0, planId, StreamOperation.REPAIR, REMOTE_ADDR, channel, UUID.randomUUID(), PreviewKind.ALL); - StreamManager.instance.register(future); + StreamResultFuture future = StreamResultFuture.createFollower(0, planId, StreamOperation.REPAIR, REMOTE_ADDR, channel, UUID.randomUUID(), PreviewKind.ALL); + StreamManager.instance.registerFollower(future); StreamMessageHeader header = new StreamMessageHeader(TableId.generate(), REMOTE_ADDR, planId, 0, 0, 0, UUID.randomUUID()); IncomingStreamMessage msg = new IncomingStreamMessage(null, header); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org