This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push: new 7ecef7c CASSANDRASC-131: Fix SidecarLoadBalancingPolicy unexpectedly removing local node and improve CI stability (#122) 7ecef7c is described below commit 7ecef7c3c2e2137d4cb045ccf429703cacf96370 Author: Yifan Cai <52585731+yifa...@users.noreply.github.com> AuthorDate: Wed May 15 21:17:06 2024 -0700 CASSANDRASC-131: Fix SidecarLoadBalancingPolicy unexpectedly removing local node and improve CI stability (#122) Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSANDRASC-131 --- .circleci/config.yml | 266 +++++++++++++++++++-- CHANGES.txt | 1 + build.gradle | 104 ++++---- scripts/install-shaded-dtest-jar-local.sh | 48 ++++ .../sidecar/common/server/ThrowingRunnable.java | 28 +++ .../cluster/SidecarLoadBalancingPolicy.java | 5 +- .../sidecar/concurrent/TaskExecutorPool.java | 10 +- .../sidecar/exceptions/ThrowableUtils.java | 35 ++- .../sidecar/restore/RestoreJobManager.java | 24 +- .../ClearSnapshotHandlerIntegrationTest.java | 8 +- .../CreateSnapshotHandlerIntegrationTest.java | 10 +- .../ListSnapshotHandlerIntegrationTest.java | 4 +- .../SSTableImportHandlerIntegrationTest.java | 6 +- .../tokenrange/BaseTokenRangeIntegrationTest.java | 3 - .../sidecar/routes/tokenrange/JoiningBaseTest.java | 7 +- .../tokenrange/JoiningTestDoubleCluster.java | 4 +- .../routes/tokenrange/JoiningTestMultiDC.java | 4 +- .../JoiningTestMultiDCSingleReplicated.java | 4 +- .../tokenrange/JoiningTestMultipleNodes.java | 4 +- .../routes/tokenrange/JoiningTestSingleNode.java | 2 +- .../sidecar/routes/tokenrange/LeavingBaseTest.java | 7 +- .../sidecar/routes/tokenrange/LeavingTest.java | 8 +- .../routes/tokenrange/LeavingTestMultiDC.java | 4 +- .../tokenrange/LeavingTestMultiDCHalveCluster.java | 15 +- .../sidecar/routes/tokenrange/MovingBaseTest.java | 9 +- .../routes/tokenrange/MovingMultiDCTest.java | 4 +- .../sidecar/routes/tokenrange/MovingTest.java | 2 +- .../routes/tokenrange/ReplacementBaseTest.java | 35 +-- .../routes/tokenrange/ReplacementMultiDCTest.java | 4 +- .../sidecar/routes/tokenrange/ReplacementTest.java | 2 +- .../sidecar/testing/IntegrationTestBase.java | 72 +++++- .../testing/AbstractCassandraTestContext.java | 45 +++- .../cassandra/testing/CassandraTestTemplate.java | 43 +++- .../testing/ConfigurableCassandraTestContext.java | 59 +---- .../apache/cassandra/sidecar/AssertionUtils.java | 13 +- .../sidecar/restore/RestoreJobManagerTest.java | 40 ++-- .../sstableuploads/SSTableUploadHandlerTest.java | 20 +- src/test/resources/logback-sidecar.xml | 36 +++ 38 files changed, 757 insertions(+), 238 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 8f263a9..15ad8bb 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -88,19 +88,62 @@ jobs: - store_test_results: path: build/test-results/ + build-dtest-jdk8: + docker: + - image: cimg/openjdk:8.0 + resource_class: large + steps: + - install_common + - checkout + - run: + name: Build jvm dtest jars with jdk8 + command: | + CASSANDRA_USE_JDK11=false BRANCHES="cassandra-4.0 cassandra-4.1" scripts/build-dtest-jars.sh + - persist_to_workspace: + root: dtest-jars + paths: + - "*.jar" + integration_cassandra_40_java8: docker: - image: cimg/openjdk:8.0 environment: - INTEGRATION_MAX_PARALLEL_FORKS: 2 - INTEGRATION_MAX_HEAP_SIZE: "3500M" + INTEGRATION_MAX_PARALLEL_FORKS: 3 + INTEGRATION_MAX_HEAP_SIZE: "2500M" + resource_class: large + steps: + - install_common + - checkout + - attach_workspace: + at: dtest-jars + - run: ./scripts/install-shaded-dtest-jar-local.sh + - run: ./gradlew --no-daemon -PdtestVersion=4.1.4 -Dcassandra.sidecar.versions_to_test="4.0" checkstyleIntegrationTest spotbugsIntegrationTest integrationTestLightWeight --stacktrace + + - store_artifacts: + path: build/reports + destination: test-reports + + - store_artifacts: + path: build/test-results/ + destination: test-results + + - store_test_results: + path: build/test-results/ + + integration_heavy_cassandra_40_java8: + docker: + - image: cimg/openjdk:8.0 + environment: + INTEGRATION_MAX_PARALLEL_FORKS: 1 + INTEGRATION_MAX_HEAP_SIZE: "7000M" resource_class: large steps: - install_common - checkout - # Cassandra 4.0 jar seems to be missing some dependencies, so we use 4.1 here (this is what we currently do) - - run: BRANCHES="cassandra-4.0 cassandra-4.1" scripts/build-dtest-jars.sh - - run: ./gradlew --no-daemon -PdtestVersion=4.1.4 -Dcassandra.sidecar.versions_to_test="4.0" checkstyleIntegrationTest spotbugsIntegrationTest integrationTest --stacktrace + - attach_workspace: + at: dtest-jars + - run: ./scripts/install-shaded-dtest-jar-local.sh + - run: ./gradlew --no-daemon -PdtestVersion=4.1.4 -Dcassandra.sidecar.versions_to_test="4.0" integrationTestHeavyWeight --stacktrace - store_artifacts: path: build/reports @@ -117,14 +160,42 @@ jobs: docker: - image: cimg/openjdk:8.0 environment: - INTEGRATION_MAX_PARALLEL_FORKS: 2 - INTEGRATION_MAX_HEAP_SIZE: "3500M" + INTEGRATION_MAX_PARALLEL_FORKS: 3 + INTEGRATION_MAX_HEAP_SIZE: "2500M" + resource_class: large + steps: + - install_common + - checkout + - attach_workspace: + at: dtest-jars + - run: ./scripts/install-shaded-dtest-jar-local.sh + - run: ./gradlew --no-daemon -PdtestVersion=4.1.4 -Dcassandra.sidecar.versions_to_test="4.1" checkstyleIntegrationTest spotbugsIntegrationTest integrationTestLightWeight --stacktrace + + - store_artifacts: + path: build/reports + destination: test-reports + + - store_artifacts: + path: build/test-results/ + destination: test-results + + - store_test_results: + path: build/test-results/ + + integration_heavy_cassandra_41_java8: + docker: + - image: cimg/openjdk:8.0 + environment: + INTEGRATION_MAX_PARALLEL_FORKS: 1 + INTEGRATION_MAX_HEAP_SIZE: "7000M" resource_class: large steps: - install_common - checkout - - run: BRANCHES="cassandra-4.1" scripts/build-dtest-jars.sh - - run: ./gradlew --no-daemon -PdtestVersion=4.1.4 -Dcassandra.sidecar.versions_to_test="4.1" checkstyleIntegrationTest spotbugsIntegrationTest integrationTest --stacktrace + - attach_workspace: + at: dtest-jars + - run: ./scripts/install-shaded-dtest-jar-local.sh + - run: ./gradlew --no-daemon -PdtestVersion=4.1.4 -Dcassandra.sidecar.versions_to_test="4.1" integrationTestHeavyWeight --stacktrace - store_artifacts: path: build/reports @@ -156,19 +227,62 @@ jobs: - store_test_results: path: build/test-results/ + build-dtest-jdk11: + docker: + - image: cimg/openjdk:11.0 + resource_class: large + steps: + - install_common + - checkout + - run: + name: Build jvm dtest jars with jdk11 + command: | + CASSANDRA_USE_JDK11=true scripts/build-dtest-jars.sh + - persist_to_workspace: + root: dtest-jars + paths: + - "*.jar" + integration_cassandra_40_java11: docker: - image: cimg/openjdk:11.0 environment: - INTEGRATION_MAX_PARALLEL_FORKS: 2 - INTEGRATION_MAX_HEAP_SIZE: "3500M" + INTEGRATION_MAX_PARALLEL_FORKS: 3 + INTEGRATION_MAX_HEAP_SIZE: "2500M" resource_class: large steps: - install_common - checkout - # Cassandra 4.0 jar seems to be missing some dependencies, so we use 4.1 here (this is what we currently do) - - run: BRANCHES="cassandra-4.0 cassandra-4.1" CASSANDRA_USE_JDK11=true scripts/build-dtest-jars.sh - - run: ./gradlew --no-daemon -PdtestVersion=4.1.4 -Dcassandra.sidecar.versions_to_test="4.0" checkstyleIntegrationTest spotbugsIntegrationTest integrationTest --stacktrace + - attach_workspace: + at: dtest-jars + - run: ./scripts/install-shaded-dtest-jar-local.sh + - run: ./gradlew --no-daemon -PdtestVersion=4.1.4 -Dcassandra.sidecar.versions_to_test="4.0" checkstyleIntegrationTest spotbugsIntegrationTest integrationTestLightWeight --stacktrace + + - store_artifacts: + path: build/reports + destination: test-reports + + - store_artifacts: + path: build/test-results/ + destination: test-results + + - store_test_results: + path: build/test-results/ + + integration_heavy_cassandra_40_java11: + docker: + - image: cimg/openjdk:11.0 + environment: + INTEGRATION_MAX_PARALLEL_FORKS: 1 + INTEGRATION_MAX_HEAP_SIZE: "7000M" + resource_class: large + steps: + - install_common + - checkout + - attach_workspace: + at: dtest-jars + - run: ./scripts/install-shaded-dtest-jar-local.sh + - run: ./gradlew --no-daemon -PdtestVersion=4.1.4 -Dcassandra.sidecar.versions_to_test="4.0" integrationTestHeavyWeight --stacktrace - store_artifacts: path: build/reports @@ -185,14 +299,42 @@ jobs: docker: - image: cimg/openjdk:11.0 environment: - INTEGRATION_MAX_PARALLEL_FORKS: 2 - INTEGRATION_MAX_HEAP_SIZE: "3500M" + INTEGRATION_MAX_PARALLEL_FORKS: 3 + INTEGRATION_MAX_HEAP_SIZE: "2500M" + resource_class: large + steps: + - install_common + - checkout + - attach_workspace: + at: dtest-jars + - run: ./scripts/install-shaded-dtest-jar-local.sh + - run: ./gradlew --no-daemon -PdtestVersion=5.0-alpha2 -Dcassandra.sidecar.versions_to_test="5.0" checkstyleIntegrationTest spotbugsIntegrationTest integrationTestLightWeight --stacktrace + + - store_artifacts: + path: build/reports + destination: test-reports + + - store_artifacts: + path: build/test-results/ + destination: test-results + + - store_test_results: + path: build/test-results/ + + integration_heavy_cassandra_50_java11: + docker: + - image: cimg/openjdk:11.0 + environment: + INTEGRATION_MAX_PARALLEL_FORKS: 1 + INTEGRATION_MAX_HEAP_SIZE: "7000M" resource_class: large steps: - install_common - checkout - - run: BRANCHES="cassandra-5.0" scripts/build-dtest-jars.sh - - run: ./gradlew --no-daemon -PdtestVersion=5.0-alpha2 -Dcassandra.sidecar.versions_to_test="5.0" checkstyleIntegrationTest spotbugsIntegrationTest integrationTest --stacktrace + - attach_workspace: + at: dtest-jars + - run: ./scripts/install-shaded-dtest-jar-local.sh + - run: ./gradlew --no-daemon -PdtestVersion=5.0-alpha2 -Dcassandra.sidecar.versions_to_test="5.0" integrationTestHeavyWeight --stacktrace - store_artifacts: path: build/reports @@ -209,14 +351,42 @@ jobs: docker: - image: cimg/openjdk:11.0 environment: - INTEGRATION_MAX_PARALLEL_FORKS: 2 - INTEGRATION_MAX_HEAP_SIZE: "3500M" + INTEGRATION_MAX_PARALLEL_FORKS: 3 + INTEGRATION_MAX_HEAP_SIZE: "2500M" resource_class: large steps: - install_common - checkout - - run: BRANCHES="trunk" scripts/build-dtest-jars.sh - - run: ./gradlew --no-daemon -PdtestVersion=5.1 -Dcassandra.sidecar.versions_to_test="5.1" checkstyleIntegrationTest spotbugsIntegrationTest integrationTest --stacktrace + - attach_workspace: + at: dtest-jars + - run: ./scripts/install-shaded-dtest-jar-local.sh + - run: ./gradlew --no-daemon -PdtestVersion=5.1 -Dcassandra.sidecar.versions_to_test="5.1" checkstyleIntegrationTest spotbugsIntegrationTest integrationTestLightWeight --stacktrace + + - store_artifacts: + path: build/reports + destination: test-reports + + - store_artifacts: + path: build/test-results/ + destination: test-results + + - store_test_results: + path: build/test-results/ + + integration_heavy_cassandra_trunk_java11: + docker: + - image: cimg/openjdk:11.0 + environment: + INTEGRATION_MAX_PARALLEL_FORKS: 1 + INTEGRATION_MAX_HEAP_SIZE: "7000M" + resource_class: large + steps: + - install_common + - checkout + - attach_workspace: + at: dtest-jars + - run: ./scripts/install-shaded-dtest-jar-local.sh + - run: ./gradlew --no-daemon -PdtestVersion=5.1 -Dcassandra.sidecar.versions_to_test="5.1" integrationTestHeavyWeight --stacktrace - store_artifacts: path: build/reports @@ -275,64 +445,116 @@ workflows: build-and-test: jobs: - unit_java8 + - build-dtest-jdk8 - integration_cassandra_40_java8: requires: - unit_java8 + - build-dtest-jdk8 - integration_cassandra_41_java8: requires: - unit_java8 + - build-dtest-jdk8 + - integration_heavy_cassandra_40_java8: + requires: + - unit_java8 + - build-dtest-jdk8 + - integration_heavy_cassandra_41_java8: + requires: + - unit_java8 + - build-dtest-jdk8 - unit_java11 + - build-dtest-jdk11 - integration_cassandra_40_java11: requires: - unit_java11 + - build-dtest-jdk11 - integration_cassandra_50_java11: requires: - unit_java11 + - build-dtest-jdk11 - integration_cassandra_trunk_java11: requires: - unit_java11 + - build-dtest-jdk11 + - integration_heavy_cassandra_40_java11: + requires: + - unit_java11 + - build-dtest-jdk11 + - integration_heavy_cassandra_50_java11: + requires: + - unit_java11 + - build-dtest-jdk11 + - integration_heavy_cassandra_trunk_java11: + requires: + - unit_java11 + - build-dtest-jdk11 - docs_build: requires: - unit_java8 - integration_cassandra_40_java8 + - integration_heavy_cassandra_40_java8 - integration_cassandra_41_java8 + - integration_heavy_cassandra_41_java8 - unit_java11 - integration_cassandra_40_java11 + - integration_heavy_cassandra_40_java11 - integration_cassandra_50_java11 + - integration_heavy_cassandra_50_java11 - integration_cassandra_trunk_java11 + - integration_heavy_cassandra_trunk_java11 - docker_build: requires: - unit_java8 - integration_cassandra_40_java8 + - integration_heavy_cassandra_40_java8 - integration_cassandra_41_java8 + - integration_heavy_cassandra_41_java8 - unit_java11 - integration_cassandra_40_java11 + - integration_heavy_cassandra_40_java11 - integration_cassandra_50_java11 + - integration_heavy_cassandra_50_java11 - integration_cassandra_trunk_java11 + - integration_heavy_cassandra_trunk_java11 - rpm_build_install: requires: - unit_java8 - integration_cassandra_40_java8 + - integration_heavy_cassandra_40_java8 - integration_cassandra_41_java8 + - integration_heavy_cassandra_41_java8 - unit_java11 - integration_cassandra_40_java11 + - integration_heavy_cassandra_40_java11 - integration_cassandra_50_java11 + - integration_heavy_cassandra_50_java11 - integration_cassandra_trunk_java11 + - integration_heavy_cassandra_trunk_java11 - deb_build_install: requires: - unit_java8 - integration_cassandra_40_java8 + - integration_heavy_cassandra_40_java8 - integration_cassandra_41_java8 + - integration_heavy_cassandra_41_java8 - unit_java11 - integration_cassandra_40_java11 + - integration_heavy_cassandra_40_java11 - integration_cassandra_50_java11 + - integration_heavy_cassandra_50_java11 - integration_cassandra_trunk_java11 + - integration_heavy_cassandra_trunk_java11 - docker_build: requires: - unit_java8 - integration_cassandra_40_java8 + - integration_heavy_cassandra_40_java8 - integration_cassandra_41_java8 + - integration_heavy_cassandra_41_java8 - unit_java11 - integration_cassandra_40_java11 + - integration_heavy_cassandra_40_java11 - integration_cassandra_50_java11 + - integration_heavy_cassandra_50_java11 - integration_cassandra_trunk_java11 + - integration_heavy_cassandra_trunk_java11 diff --git a/CHANGES.txt b/CHANGES.txt index d1c362c..f53f076 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 1.0.0 ----- + * Fix SidecarLoadBalancingPolicy unexpectedly removing local node and improve CI stability (CASSANDRASC-131) * Reduce implementations accessible from client (CASSANDRASC-127) * Fix wait time acquired in SidecarRateLimiter (CASSANDRASC-124) * Make RestoreJobDiscoverer less verbose (CASSANDRASC-126) diff --git a/build.gradle b/build.gradle index 6f048d5..91d8bdf 100644 --- a/build.gradle +++ b/build.gradle @@ -244,6 +244,7 @@ dependencies { } integrationTestImplementation(group: 'org.apache.cassandra', name: "${dtestDependencyName}", version: "${dtestVersion}") integrationTestImplementation(group: 'org.apache.cassandra', name: 'dtest-api', version: "${dtestApiVersion}") + integrationTestImplementation "org.junit.jupiter:junit-jupiter-api:${project.junitVersion}" // Needed by the Cassandra dtest framework integrationTestImplementation("org.junit.vintage:junit-vintage-engine:${junitVersion}") // Needed for snapshot manifest validation @@ -310,6 +311,8 @@ clean.doLast { test { systemProperty "vertxweb.environment", "dev" + systemProperty "logback.configurationFile", "logback-sidecar.xml" + systemProperty "vertx.logger-delegate-factory-class-name", "io.vertx.core.logging.SLF4JLogDelegateFactory" // ordinarily we don't need integration tests // see the integrationTest task useJUnitPlatform() @@ -342,51 +345,63 @@ def JDK11_OPTIONS = ['-Djdk.attach.allowAttachSelf=true', '--add-opens', 'java.base/jdk.internal.util.jar=ALL-UNNAMED', '--add-opens', 'jdk.management/com.sun.management.internal=ALL-UNNAMED'] -tasks.register("integrationTest", Test) { - if (JavaVersion.current().isJava11Compatible()) { - jvmArgs(JDK11_OPTIONS) - println("JVM arguments for $project.name are $allJvmArgs") - } - // Disable direct memory allocator as it doesn't release properly - systemProperty "cassandra.netty_use_heap_allocator", "true" - systemProperty "vertxweb.environment", "dev" - // config logback for in-jvm clusters - systemProperty "logback.configurationFile", "src/test/resources/logback-in-jvm-dtest.xml" - // Because tests are forked, we need to explicitly pass the system property from the - // Gradle JVM down to the children - - def versionsToTest = System.getProperty("cassandra.sidecar.versions_to_test", null) - if (versionsToTest != "" && versionsToTest != null) { - systemProperty "cassandra.sidecar.versions_to_test", versionsToTest - } - useJUnitPlatform() { - includeTags "integrationTest" - } - reports { - junitXml.enabled = true - def destDir = Paths.get(rootProject.rootDir.absolutePath, "build", "test-results", "integration").toFile() - println("Destination directory for integration tests: ${destDir}") - junitXml.destination = destDir - html.enabled = true - } - testLogging { - events "passed", "skipped", "failed" - } - testClassesDirs = sourceSets.integrationTest.output.classesDirs - classpath = sourceSets.integrationTest.runtimeClasspath - shouldRunAfter test - forkEvery = 1 // DTest framework tends to have issues without forkEvery test class - maxHeapSize = integrationMaxHeapSize - maxParallelForks = integrationMaxParallelForks - - afterTest { descriptor, result -> - def totalTime = (result.endTime - result.startTime) / 1000.0 - logger.lifecycle("--") - if (totalTime >= 60) { // log the tests that take 1 minute and more - logger.warn("$descriptor.displayName took $totalTime s") +def integrationTest = task("integrationTest") + +['integrationTestLightWeight', 'integrationTestHeavyWeight'].each { name -> + tasks.register(name, Test) { + if (JavaVersion.current().isJava11Compatible()) { + jvmArgs(JDK11_OPTIONS) + println("JVM arguments for $project.name are $allJvmArgs") } - else { - logger.info("$descriptor.displayName took $totalTime s") + // Disable direct memory allocator as it doesn't release properly + systemProperty "cassandra.netty_use_heap_allocator", "true" + systemProperty "vertxweb.environment", "dev" + // config logback for in-jvm clusters + systemProperty "logback.configurationFile", "src/test/resources/logback-in-jvm-dtest.xml" + // Because tests are forked, we need to explicitly pass the system property from the + // Gradle JVM down to the children + + def versionsToTest = System.getProperty("cassandra.sidecar.versions_to_test", null) + if (versionsToTest != "" && versionsToTest != null) { + systemProperty "cassandra.sidecar.versions_to_test", versionsToTest + } + useJUnitPlatform() { + if (name.contains("HeavyWeight")) + { + includeTags "heavy" + } + else + { + excludeTags "heavy" + } + } + + reports { + junitXml.enabled = true + def destDir = Paths.get(rootProject.rootDir.absolutePath, "build", "test-results", "integration").toFile() + println("Destination directory for integration tests: ${destDir}") + junitXml.destination = destDir + html.enabled = true + } + testLogging { + events "passed", "skipped", "failed" + } + testClassesDirs = sourceSets.integrationTest.output.classesDirs + classpath = sourceSets.integrationTest.runtimeClasspath + shouldRunAfter test + forkEvery = 1 // DTest framework tends to have issues without forkEvery test class + maxHeapSize = integrationMaxHeapSize + maxParallelForks = integrationMaxParallelForks + + afterTest { descriptor, result -> + def totalTime = (result.endTime - result.startTime) / 1000.0 + logger.lifecycle("--") + if (totalTime >= 60) { // log the tests that take 1 minute and more + logger.warn("$descriptor.displayName took $totalTime s") + } + else { + logger.info("$descriptor.displayName took $totalTime s") + } } } } @@ -508,6 +523,7 @@ spotbugsContainerTest.onlyIf { "true" != System.getenv("skipContainerTest") } // copyDist gets called on every build copyDist.dependsOn installDist, copyJolokia +integrationTest.dependsOn integrationTestLightWeight, integrationTestHeavyWeight check.dependsOn codeCheckTasks, containerTest, integrationTest, jacocoTestReport build.dependsOn copyDist, copyJolokia, copyDocs run.dependsOn build diff --git a/scripts/install-shaded-dtest-jar-local.sh b/scripts/install-shaded-dtest-jar-local.sh new file mode 100755 index 0000000..2104270 --- /dev/null +++ b/scripts/install-shaded-dtest-jar-local.sh @@ -0,0 +1,48 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +set -xe + +REPO_DIR="${M2_HOME:-${HOME}/.m2/repository}" +SCRIPT_DIR=$( dirname -- "$( readlink -f -- "$0"; )"; ) +DTEST_JAR_DIR="$(dirname "${SCRIPT_DIR}/")/dtest-jars" +DTEST_ARTIFACT_ID=cassandra-dtest-local-all + +function extract_version { + echo "$1" | sed -n "s/^.*dtest-jars\/dtest-\(\S*\)\..*$/\1/p" +} + +function install { + CASSANDRA_VERSION="$1" + "${SCRIPT_DIR}/mvnw" install:install-file \ + -Dfile="${DTEST_JAR_DIR}/dtest-${CASSANDRA_VERSION}.jar" \ + -DgroupId=org.apache.cassandra \ + -DartifactId="${DTEST_ARTIFACT_ID}" \ + -Dversion="${CASSANDRA_VERSION}" \ + -Dpackaging=jar \ + -DgeneratePom=true \ + -DlocalRepositoryPath="${REPO_DIR}" +} + +for jar in "${DTEST_JAR_DIR}"/*.jar; do + if [ -f "$jar" ]; then + echo "Installing the jar: $jar" + install "$(extract_version ${jar})" + fi +done diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/ThrowingRunnable.java b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/ThrowingRunnable.java new file mode 100644 index 0000000..cb72be9 --- /dev/null +++ b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/ThrowingRunnable.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.server; + +/** + * Similar to {@link Runnable}, but it can throw {@link Exception} + */ +@FunctionalInterface +public interface ThrowingRunnable +{ + void run() throws Exception; +} diff --git a/src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java b/src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java index 9f5e040..db3603e 100644 --- a/src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java +++ b/src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java @@ -127,7 +127,7 @@ class SidecarLoadBalancingPolicy implements LoadBalancingPolicy public synchronized void onDown(Host host) { // Don't remove local addresses from the selected host list - if (localHostAddresses.contains(host.getBroadcastRpcAddress())) + if (localHostAddresses.contains(driverUtils.getSocketAddress(host))) { LOGGER.debug("Local Node {} has been marked down.", host); return; @@ -190,7 +190,8 @@ class SidecarLoadBalancingPolicy implements LoadBalancingPolicy { if (localHosts.size() < numLocalHostsConfigured) { - LOGGER.warn("Could not find all configured local hosts in host list."); + LOGGER.warn("Could not find all configured local hosts in host list. ConfiguredHosts={} AvailableHosts={}", + numLocalHostsConfigured, localHosts.size()); } selectedHosts.addAll(localHosts); } diff --git a/src/main/java/org/apache/cassandra/sidecar/concurrent/TaskExecutorPool.java b/src/main/java/org/apache/cassandra/sidecar/concurrent/TaskExecutorPool.java index f491afd..c76f4c9 100644 --- a/src/main/java/org/apache/cassandra/sidecar/concurrent/TaskExecutorPool.java +++ b/src/main/java/org/apache/cassandra/sidecar/concurrent/TaskExecutorPool.java @@ -27,6 +27,7 @@ import io.vertx.core.Handler; import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.WorkerExecutor; +import org.apache.cassandra.sidecar.common.server.ThrowingRunnable; import org.apache.cassandra.sidecar.config.WorkerPoolConfiguration; import org.apache.cassandra.sidecar.metrics.SidecarMetrics; import org.apache.cassandra.sidecar.metrics.StopWatch; @@ -225,15 +226,6 @@ public abstract class TaskExecutorPool implements WorkerExecutor : workerExecutor.close(); } - /** - * Similar to {@link Runnable}, but it can throw {@link Exception} - */ - @FunctionalInterface - public interface ThrowingRunnable - { - void run() throws Exception; - } - /** * {@link ServiceTaskExecutorPool} is used for executing tasks that are short lived and not expected to block for * too long, therefore will free up resources more quickly diff --git a/src/main/java/org/apache/cassandra/sidecar/exceptions/ThrowableUtils.java b/src/main/java/org/apache/cassandra/sidecar/exceptions/ThrowableUtils.java index a1ab72e..db00897 100644 --- a/src/main/java/org/apache/cassandra/sidecar/exceptions/ThrowableUtils.java +++ b/src/main/java/org/apache/cassandra/sidecar/exceptions/ThrowableUtils.java @@ -18,23 +18,49 @@ package org.apache.cassandra.sidecar.exceptions; +import java.util.concurrent.Callable; import java.util.function.Predicate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.cassandra.sidecar.common.server.ThrowingRunnable; /** * Collection of utility methods for understanding {@link Throwable} thrown better */ public class ThrowableUtils { - private static final Logger LOGGER = LoggerFactory.getLogger(ThrowableUtils.class); - private ThrowableUtils() { throw new UnsupportedOperationException(); } + /** + * Run the {@code actionMayThrow} and wrap any {@link Exception} thrown in {@link RuntimeException} + * @param actionMayThrow action that may throw exceptions + * @param <R> return value type of the action + */ + public static <R> R propagate(Callable<R> actionMayThrow) + { + try + { + return actionMayThrow.call(); + } + catch (Exception cause) + { + throw new RuntimeException(cause); + } + } + + /** + * Similar to {@link #propagate(Callable)}, but takes runnable-ish + */ + public static void propagate(ThrowingRunnable actionMayThrow) + { + propagate(() -> { + actionMayThrow.run(); + return null; + }); + } + /** * Get the first throwable in the exception chain that matches with the expected throwable class. * When there is circular exception reference, it tries to visit all exceptions in the chain at least once @@ -80,7 +106,6 @@ public class ThrowableUtils { // Mark the position to stop, and continue tracing the cause up until hitting stop the next time. // This way we are sure that all exceptions/causes are visited at least once. -// LOGGER.warn("Circular exception reference detected!", throwable); stop = cause; } cause = getCause(cause, 1); diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobManager.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobManager.java index c6cf008..cb68a8a 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobManager.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobManager.java @@ -43,6 +43,8 @@ import org.apache.cassandra.sidecar.config.RestoreJobConfiguration; import org.apache.cassandra.sidecar.db.RestoreJob; import org.apache.cassandra.sidecar.db.RestoreSlice; import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException; +import org.apache.cassandra.sidecar.exceptions.ThrowableUtils; +import org.jetbrains.annotations.VisibleForTesting; /** * Manages the restore job per instance @@ -63,13 +65,26 @@ public class RestoreJobManager InstanceMetadata instanceMetadata, ExecutorPools executorPools, RestoreProcessor restoreProcessor) + { + this(restoreJobConfig, instanceMetadata, executorPools, restoreProcessor, true); + } + + @VisibleForTesting + public RestoreJobManager(RestoreJobConfiguration restoreJobConfig, + InstanceMetadata instanceMetadata, + ExecutorPools executorPools, + RestoreProcessor restoreProcessor, + boolean deleteOnStart) { this.restoreJobConfig = restoreJobConfig; this.instanceMetadata = instanceMetadata; this.executorPools = executorPools; this.processor = restoreProcessor; // delete obsolete on start up. Once instance is started, the jobDiscoverer will find the jobs to cleanup - deleteObsoleteDataAsync(); + if (deleteOnStart) + { + deleteObsoleteDataAsync(); + } } /** @@ -206,12 +221,11 @@ public class RestoreJobManager { pathStream .sorted(Comparator.reverseOrder()) - .map(Path::toFile) - .forEach(File::delete); + .forEach(path -> ThrowableUtils.propagate(() -> Files.delete(path))); } - catch (IOException ioe) // thrown from Files.walk. + catch (Exception exception) { - LOGGER.warn("Error on deleting data. Path={}", root, ioe); + LOGGER.warn("Error on deleting data. Path={}", root, exception); } }); } diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/snapshots/ClearSnapshotHandlerIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/snapshots/ClearSnapshotHandlerIntegrationTest.java index 2b2f276..08c4dc8 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/snapshots/ClearSnapshotHandlerIntegrationTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/snapshots/ClearSnapshotHandlerIntegrationTest.java @@ -80,7 +80,7 @@ class ClearSnapshotHandlerIntegrationTest extends IntegrationTestBase context.verify(() -> { assertThat(createResponse.statusCode()).isEqualTo(OK.code()); final List<Path> found = - findChildFile(sidecarTestContext, "127.0.0.1", snapshotName); + findChildFile(sidecarTestContext, "127.0.0.1", tableName.keyspace(), snapshotName); assertThat(found).isNotEmpty(); // snapshot directory exists inside data directory @@ -96,7 +96,9 @@ class ClearSnapshotHandlerIntegrationTest extends IntegrationTestBase assertThat(deleteResponse.statusCode()).isEqualTo(OK.code()); final List<Path> found2 = findChildFile(sidecarTestContext, - "127.0.0.1", snapshotName); + "127.0.0.1", + tableName.keyspace(), + snapshotName); assertThat(found2).isEmpty(); context.completeNow(); }))); @@ -108,7 +110,7 @@ class ClearSnapshotHandlerIntegrationTest extends IntegrationTestBase private QualifiedTableName createTestTableAndPopulate() { QualifiedTableName tableName = createTestTable( - "CREATE TABLE %s (id text PRIMARY KEY, name text);"); + "CREATE TABLE %s (id text PRIMARY KEY, name text)" + WITH_COMPACTION_DISABLED + ";"); Session session = maybeGetSession(); session.execute("INSERT INTO " + tableName + " (id, name) VALUES ('1', 'Francisco');"); diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/snapshots/CreateSnapshotHandlerIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/snapshots/CreateSnapshotHandlerIntegrationTest.java index 5936c51..1b40c29 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/snapshots/CreateSnapshotHandlerIntegrationTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/snapshots/CreateSnapshotHandlerIntegrationTest.java @@ -184,7 +184,7 @@ class CreateSnapshotHandlerIntegrationTest extends IntegrationTestBase // validate that the snapshot is created List<Path> found = findChildFile(sidecarTestContext, "127.0.0.1", - "my-snapshot"); + tableName.keyspace(), "my-snapshot"); assertThat(found).isNotEmpty() .anyMatch(p -> p.toString().endsWith("manifest.json")) .anyMatch(p -> p.toString().endsWith("schema.cql")) @@ -219,7 +219,7 @@ class CreateSnapshotHandlerIntegrationTest extends IntegrationTestBase // validate that the snapshot is created List<Path> found = findChildFile(sidecarTestContext, "127.0.0.1", - "ttl-snapshot"); + tableName.keyspace(), "ttl-snapshot"); assertThat(found).isNotEmpty() .anyMatch(p -> p.toString().endsWith("manifest.json")) .anyMatch(p -> p.toString().endsWith("schema.cql")) @@ -285,7 +285,7 @@ class CreateSnapshotHandlerIntegrationTest extends IntegrationTestBase // validate that the snapshot is created List<Path> found = findChildFile(sidecarTestContext, "127.0.0.1", - "my-snapshot"); + tableName.keyspace(), "my-snapshot"); assertThat(found).isNotEmpty() .anyMatch(p -> p.toString().endsWith("manifest.json")) .anyMatch(p -> p.toString().endsWith("schema.cql")) @@ -300,7 +300,7 @@ class CreateSnapshotHandlerIntegrationTest extends IntegrationTestBase private QualifiedTableName createTestTableAndPopulate(String tableNamePrefix) { QualifiedTableName tableName = createTestTable(tableNamePrefix, - "CREATE TABLE %s (id text PRIMARY KEY, name text);"); + "CREATE TABLE %s (id text PRIMARY KEY, name text)" + WITH_COMPACTION_DISABLED + ";"); Session session = maybeGetSession(); session.execute("INSERT INTO " + tableName + " (id, name) VALUES ('1', 'Francisco');"); @@ -312,7 +312,7 @@ class CreateSnapshotHandlerIntegrationTest extends IntegrationTestBase private QualifiedTableName createTestTableAndPopulate() { QualifiedTableName tableName = createTestTable( - "CREATE TABLE %s (id text PRIMARY KEY, name text);"); + "CREATE TABLE %s (id text PRIMARY KEY, name text)" + WITH_COMPACTION_DISABLED + ";"); Session session = maybeGetSession(); session.execute("INSERT INTO " + tableName + " (id, name) VALUES ('1', 'Francisco');"); diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/snapshots/ListSnapshotHandlerIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/snapshots/ListSnapshotHandlerIntegrationTest.java index 24bfb30..b30ec5b 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/snapshots/ListSnapshotHandlerIntegrationTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/snapshots/ListSnapshotHandlerIntegrationTest.java @@ -54,7 +54,7 @@ class ListSnapshotHandlerIntegrationTest extends IntegrationTestBase " cyclist_name text, \n" + " rank int, \n" + " PRIMARY KEY ((race_year, race_name), rank) \n" + - ");"); + ")" + WITH_COMPACTION_DISABLED + ";"); // Insert a single record (only one data directory will be created from the configured 4 data dirs per instance) session.execute("INSERT INTO " + TEST_KEYSPACE + ".rank_by_year_and_name (race_year, race_name, cyclist_name, rank) " + @@ -69,7 +69,7 @@ class ListSnapshotHandlerIntegrationTest extends IntegrationTestBase // validate that the snapshot is created List<Path> found = findChildFile(sidecarTestContext, "127.0.0.1", - "rank_snapshot"); + TEST_KEYSPACE, "rank_snapshot"); assertThat(found).isNotEmpty() .anyMatch(p -> p.toString().endsWith("manifest.json")) .anyMatch(p -> p.toString().endsWith("schema.cql")) diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerIntegrationTest.java index 49049b0..a883ffd 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerIntegrationTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerIntegrationTest.java @@ -89,8 +89,8 @@ public class SSTableImportHandlerIntegrationTest extends IntegrationTestBase "--", tableName.keyspace()).getStdout(); assertThat(snapshotStdout).contains("Snapshot directory: " + tableName.tableName() + "-snapshot"); // find the directory in the filesystem - final List<Path> snapshotFiles = findChildFile(sidecarTestContext, - "127.0.0.1", tableName.tableName() + "-snapshot"); + final List<Path> snapshotFiles = findChildFile(sidecarTestContext, "127.0.0.1", + tableName.keyspace(), tableName.tableName() + "-snapshot"); assertThat(snapshotFiles).isNotEmpty(); @@ -188,7 +188,7 @@ public class SSTableImportHandlerIntegrationTest extends IntegrationTestBase List<String> values) { QualifiedTableName tableName = createTestTable( - "CREATE TABLE IF NOT EXISTS %s (id text, PRIMARY KEY(id));"); + "CREATE TABLE IF NOT EXISTS %s (id text, PRIMARY KEY(id))" + WITH_COMPACTION_DISABLED + ";"); Session session = maybeGetSession(); populateTable(session, tableName, values); return tableName; diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BaseTokenRangeIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BaseTokenRangeIntegrationTest.java index 08422d3..d08131d 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BaseTokenRangeIntegrationTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BaseTokenRangeIntegrationTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.sidecar.routes.tokenrange; -import java.io.IOException; import java.math.BigInteger; import java.util.ArrayList; import java.util.Collection; @@ -121,7 +120,6 @@ public class BaseTokenRangeIntegrationTest extends IntegrationTestBase protected UpgradeableCluster getMultiDCCluster(BiConsumer<ClassLoader, Integer> initializer, ConfigurableCassandraTestContext cassandraTestContext) - throws IOException { return getMultiDCCluster(initializer, cassandraTestContext, null); } @@ -129,7 +127,6 @@ public class BaseTokenRangeIntegrationTest extends IntegrationTestBase protected UpgradeableCluster getMultiDCCluster(BiConsumer<ClassLoader, Integer> initializer, ConfigurableCassandraTestContext cassandraTestContext, Consumer<UpgradeableCluster.Builder> additionalConfigurator) - throws IOException { CassandraIntegrationTest annotation = sidecarTestContext.cassandraTestContext().annotation; TokenSupplier mdcTokenSupplier = TestTokenSupplier.evenlyDistributedTokens(annotation.nodesPerDc(), diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningBaseTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningBaseTest.java index 0dea56c..5dad793 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningBaseTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningBaseTest.java @@ -97,12 +97,13 @@ class JoiningBaseTest extends BaseTokenRangeIntegrationTest Feature.JMX, Feature.NATIVE_PROTOCOL); }); - new Thread(() -> newInstance.startup(cluster)).start(); + startAsync("Start new node node" + newInstance.config().num(), + () -> newInstance.startup(cluster)); newInstances.add(newInstance); } } - awaitLatchOrTimeout(transientStateStart, 2, TimeUnit.MINUTES); + awaitLatchOrThrow(transientStateStart, 2, TimeUnit.MINUTES, "transientStateStart"); for (IUpgradeableInstance newInstance : newInstances) { @@ -139,7 +140,7 @@ class JoiningBaseTest extends BaseTokenRangeIntegrationTest splitRanges, expectedRangeMappings); - context.completeNow(); + completeContextOrThrow(context); }); } finally diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestDoubleCluster.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestDoubleCluster.java index 3b1065a..b550a9c 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestDoubleCluster.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestDoubleCluster.java @@ -28,6 +28,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import com.google.common.collect.Range; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.extension.ExtendWith; import io.vertx.junit5.VertxExtension; @@ -45,6 +46,7 @@ import org.apache.cassandra.testing.ConfigurableCassandraTestContext; * Note: Some related test classes are broken down to have a single test case to parallelize test execution and * therefore limit the instance size required to run the tests from CircleCI as the in-jvm-dtests tests are memory bound */ +@Tag("heavy") @ExtendWith(VertxExtension.class) public class JoiningTestDoubleCluster extends JoiningBaseTest { @@ -184,7 +186,7 @@ public class JoiningTestDoubleCluster extends JoiningBaseTest { // trigger bootstrap start and wait until bootstrap is ready from test transientStateStart.countDown(); - awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES); + awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES, "transientStateEnd"); } orig.call(); } diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestMultiDC.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestMultiDC.java index 16eb43f..bff3ca5 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestMultiDC.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestMultiDC.java @@ -28,6 +28,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import com.google.common.collect.Range; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.extension.ExtendWith; import io.vertx.junit5.VertxExtension; @@ -46,6 +47,7 @@ import org.apache.cassandra.testing.ConfigurableCassandraTestContext; * Note: Some related test classes are broken down to have a single test case to parallelize test execution and * therefore limit the instance size required to run the tests from CircleCI as the in-jvm-dtests tests are memory bound */ +@Tag("heavy") @ExtendWith(VertxExtension.class) public class JoiningTestMultiDC extends JoiningBaseTest { @@ -240,7 +242,7 @@ public class JoiningTestMultiDC extends JoiningBaseTest { // trigger bootstrap start and wait until bootstrap is ready from test transientStateStart.countDown(); - awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES); + awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES, "transientStateEnd"); } orig.call(); } diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestMultiDCSingleReplicated.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestMultiDCSingleReplicated.java index 170f8c6..33ecf0f 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestMultiDCSingleReplicated.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestMultiDCSingleReplicated.java @@ -28,6 +28,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import com.google.common.collect.Range; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.extension.ExtendWith; import io.vertx.junit5.VertxExtension; @@ -46,6 +47,7 @@ import org.apache.cassandra.testing.ConfigurableCassandraTestContext; * Note: Some related test classes are broken down to have a single test case to parallelize test execution and * therefore limit the instance size required to run the tests from CircleCI as the in-jvm-dtests tests are memory bound */ +@Tag("heavy") @ExtendWith(VertxExtension.class) public class JoiningTestMultiDCSingleReplicated extends JoiningBaseTest { @@ -169,7 +171,7 @@ public class JoiningTestMultiDCSingleReplicated extends JoiningBaseTest { // trigger bootstrap start and wait until bootstrap is ready from test transientStateStart.countDown(); - awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES); + awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES, "transientStateEnd"); } orig.call(); } diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestMultipleNodes.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestMultipleNodes.java index 6f2fda6..31da33a 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestMultipleNodes.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestMultipleNodes.java @@ -28,6 +28,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import com.google.common.collect.Range; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.extension.ExtendWith; import io.vertx.junit5.VertxExtension; @@ -45,6 +46,7 @@ import org.apache.cassandra.testing.ConfigurableCassandraTestContext; * Note: Some related test classes are broken down to have a single test case to parallelize test execution and * therefore limit the instance size required to run the tests from CircleCI as the in-jvm-dtests tests are memory bound */ +@Tag("heavy") @ExtendWith(VertxExtension.class) public class JoiningTestMultipleNodes extends JoiningBaseTest { @@ -156,7 +158,7 @@ public class JoiningTestMultipleNodes extends JoiningBaseTest { // trigger bootstrap start and wait until bootstrap is ready from test transientStateStart.countDown(); - awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES); + awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES, "transientStateEnd"); } orig.call(); } diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestSingleNode.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestSingleNode.java index 0f3916e..03bed02 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestSingleNode.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestSingleNode.java @@ -123,7 +123,7 @@ public class JoiningTestSingleNode extends JoiningBaseTest { // trigger bootstrap start and wait until bootstrap is ready from test transientStateStart.countDown(); - awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES); + awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES, "transientStateEnd"); } orig.call(); } diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingBaseTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingBaseTest.java index 287d9e9..3153c90 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingBaseTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingBaseTest.java @@ -79,12 +79,13 @@ class LeavingBaseTest extends BaseTokenRangeIntegrationTest for (int i = 0; i < leavingNodesPerDC * annotation.numDcs(); i++) { IUpgradeableInstance node = cluster.get(cluster.size() - i); - new Thread(() -> node.nodetoolResult("decommission").asserts().success()).start(); + startAsync("Decommission node" + node.config().num(), + () -> node.nodetoolResult("decommission").asserts().success()); leavingNodes.add(node); } // Wait until nodes have reached expected state - awaitLatchOrTimeout(transientStateStart, 2, TimeUnit.MINUTES); + awaitLatchOrThrow(transientStateStart, 2, TimeUnit.MINUTES, "transientStateStart"); for (IUpgradeableInstance node : leavingNodes) { @@ -108,7 +109,7 @@ class LeavingBaseTest extends BaseTokenRangeIntegrationTest validateTokenRanges(mappingResponse, generateExpectedRanges()); validateReplicaMapping(mappingResponse, leavingNodes, expectedRangeMappings); - context.completeNow(); + completeContextOrThrow(context); }); } finally diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingTest.java index dde164b..0743aa4 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingTest.java @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import com.google.common.collect.Range; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.extension.ExtendWith; import io.vertx.junit5.VertxExtension; @@ -52,6 +53,7 @@ import static net.bytebuddy.matcher.ElementMatchers.named; /** * Cluster shrink scenarios integration tests for token range replica mapping endpoint with the in-jvm dtest framework. */ +@Tag("heavy") @ExtendWith(VertxExtension.class) class LeavingTest extends LeavingBaseTest { @@ -287,7 +289,7 @@ class LeavingTest extends LeavingBaseTest public static void unbootstrap(@SuperCall Callable<?> orig) throws Exception { transientStateStart.countDown(); - awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES); + awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES, "transientStateEnd"); orig.call(); } @@ -329,7 +331,7 @@ class LeavingTest extends LeavingBaseTest public static void unbootstrap(@SuperCall Callable<?> orig) throws Exception { transientStateStart.countDown(); - awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES); + awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES, "transientStateEnd"); orig.call(); } @@ -371,7 +373,7 @@ class LeavingTest extends LeavingBaseTest public static void unbootstrap(@SuperCall Callable<?> orig) throws Exception { transientStateStart.countDown(); - awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES); + awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES, "transientStateEnd"); orig.call(); } diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingTestMultiDC.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingTestMultiDC.java index eb82353..cf55ca1 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingTestMultiDC.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingTestMultiDC.java @@ -28,6 +28,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import com.google.common.collect.Range; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.extension.ExtendWith; import io.vertx.junit5.VertxExtension; @@ -50,6 +51,7 @@ import static net.bytebuddy.matcher.ElementMatchers.named; * Multi-DC Cluster shrink scenarios integration tests for token range replica mapping endpoint with the in-jvm * dtest framework. */ +@Tag("heavy") @ExtendWith(VertxExtension.class) class LeavingTestMultiDC extends LeavingBaseTest { @@ -182,7 +184,7 @@ class LeavingTestMultiDC extends LeavingBaseTest public static void unbootstrap(@SuperCall Callable<?> orig) throws Exception { transientStateStart.countDown(); - awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES); + awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES, "transientStateEnd"); orig.call(); } diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingTestMultiDCHalveCluster.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingTestMultiDCHalveCluster.java index d193613..30681a2 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingTestMultiDCHalveCluster.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingTestMultiDCHalveCluster.java @@ -28,9 +28,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import com.google.common.collect.Range; -import org.junit.jupiter.api.extension.ExtendWith; -import io.vertx.junit5.VertxExtension; +import org.junit.jupiter.api.Disabled; + import io.vertx.junit5.VertxTestContext; import net.bytebuddy.ByteBuddy; import net.bytebuddy.description.type.TypeDescription; @@ -41,7 +41,6 @@ import net.bytebuddy.implementation.MethodDelegation; import net.bytebuddy.implementation.bind.annotation.SuperCall; import net.bytebuddy.pool.TypePool; import org.apache.cassandra.distributed.UpgradeableCluster; -import org.apache.cassandra.testing.CassandraIntegrationTest; import org.apache.cassandra.testing.ConfigurableCassandraTestContext; import static net.bytebuddy.matcher.ElementMatchers.named; @@ -53,10 +52,14 @@ import static net.bytebuddy.matcher.ElementMatchers.named; * Note: Some related test classes are broken down to have a single test case to parallelize test execution and * therefore limit the instance size required to run the tests from CircleCI as the in-jvm-dtests tests are memory bound */ -@ExtendWith(VertxExtension.class) +// TODO: the test needs rework; disable it for now +//@Tag("heavy") +//@ExtendWith(VertxExtension.class) +@Disabled("This test currently shinks the cluster without any replica-safe awareness " + + "causing problems when nodes are streaming data to other nodes that have already started leaving") class LeavingTestMultiDCHalveCluster extends LeavingBaseTest { - @CassandraIntegrationTest(nodesPerDc = 6, numDcs = 2, network = true, buildCluster = false) +// @CassandraIntegrationTest(nodesPerDc = 6, numDcs = 2, network = true, buildCluster = false) void retrieveMappingMultiDCHalveClusterSize(VertxTestContext context, ConfigurableCassandraTestContext cassandraTestContext) throws Exception { @@ -215,7 +218,7 @@ class LeavingTestMultiDCHalveCluster extends LeavingBaseTest public static void unbootstrap(@SuperCall Callable<?> orig) throws Exception { transientStateStart.countDown(); - awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES); + awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES, "transientStateEnd"); orig.call(); } diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingBaseTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingBaseTest.java index 52a7d6c..aff80a5 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingBaseTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingBaseTest.java @@ -82,12 +82,13 @@ class MovingBaseTest extends BaseTokenRangeIntegrationTest int movingNodeIndex = (annotation.numDcs() > 1) ? MULTIDC_MOVING_NODE_IDX : MOVING_NODE_IDX; IUpgradeableInstance movingNode = cluster.get(movingNodeIndex); - new Thread(() -> movingNode.nodetoolResult("move", "--", Long.toString(moveTargetToken)) + startAsync("move token of node" + movingNode.config().num() + " to " + moveTargetToken, + () -> movingNode.nodetoolResult("move", "--", Long.toString(moveTargetToken)) .asserts() - .success()).start(); + .success()); // Wait until nodes have reached expected state - awaitLatchOrTimeout(transientStateStart, 2, TimeUnit.MINUTES); + awaitLatchOrThrow(transientStateStart, 2, TimeUnit.MINUTES, "transientStateStart"); ClusterUtils.awaitRingState(seed, movingNode, "Moving"); retrieveMappingWithKeyspace(context, TEST_KEYSPACE, response -> { @@ -106,7 +107,7 @@ class MovingBaseTest extends BaseTokenRangeIntegrationTest validateTokenRanges(mappingResponse, expectedRanges); validateReplicaMapping(mappingResponse, movingNode, moveTargetToken, expectedRangeMappings); - context.completeNow(); + completeContextOrThrow(context); }); } finally diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingMultiDCTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingMultiDCTest.java index 98c596a..1e46d6e 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingMultiDCTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingMultiDCTest.java @@ -29,6 +29,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import com.google.common.collect.Range; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.extension.ExtendWith; import io.vertx.junit5.VertxExtension; @@ -51,6 +52,7 @@ import static net.bytebuddy.matcher.ElementMatchers.named; * Multi-DC Node movement scenarios integration tests for token range replica mapping endpoint with the in-jvm * dtest framework. */ +@Tag("heavy") @ExtendWith(VertxExtension.class) class MovingMultiDCTest extends MovingBaseTest { @@ -199,7 +201,7 @@ class MovingMultiDCTest extends MovingBaseTest { Future<?> res = orig.call(); transientStateStart.countDown(); - awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES); + awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES, "transientStateEnd"); return res; } diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingTest.java index f40322e..86dc494 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingTest.java @@ -159,7 +159,7 @@ class MovingTest extends MovingBaseTest { Future<?> res = orig.call(); transientStateStart.countDown(); - awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES); + awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES, "transientStateEnd"); return res; } diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementBaseTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementBaseTest.java index ca588eb..15eece3 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementBaseTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementBaseTest.java @@ -101,7 +101,7 @@ class ReplacementBaseTest extends BaseTokenRangeIntegrationTest List<IUpgradeableInstance> newNodes = startReplacementNodes(nodeStart, cluster, nodesToRemove); sidecarTestContext.refreshInstancesConfig(); // Wait until replacement nodes are in JOINING state - awaitLatchOrTimeout(transientStateStart, 2, TimeUnit.MINUTES); + awaitLatchOrThrow(transientStateStart, 2, TimeUnit.MINUTES, "transientStateStart"); // Verify state of replacement nodes for (IUpgradeableInstance newInstance : newNodes) @@ -135,9 +135,9 @@ class ReplacementBaseTest extends BaseTokenRangeIntegrationTest int nodeCount = annotation.nodesPerDc() * annotation.numDcs(); validateTokenRanges(mappingResponse, generateExpectedRanges(nodeCount)); - validateReplicaMapping(mappingResponse, newNodes, expectedRangeMappings); - context.completeNow(); + + completeContextOrThrow(context); }); } finally @@ -174,20 +174,21 @@ class ReplacementBaseTest extends BaseTokenRangeIntegrationTest c.set("storage_port", remPort); }); - new Thread(() -> ClusterUtils.start(replacement, (properties) -> { - properties.set(CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK, true); - properties.set(CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS, - TimeUnit.SECONDS.toMillis(10L)); - properties.with("cassandra.broadcast_interval_ms", - Long.toString(TimeUnit.SECONDS.toMillis(30L))); - properties.with("cassandra.ring_delay_ms", - Long.toString(TimeUnit.SECONDS.toMillis(10L))); - // This property tells cassandra that this new instance is replacing the node with - // address remAddress and port remPort - properties.with("cassandra.replace_address_first_boot", remAddress + ":" + remPort); - })).start(); - - awaitLatchOrTimeout(nodeStart, 2, TimeUnit.MINUTES); + startAsync("Start replacement node node" + replacement.config().num(), + () -> ClusterUtils.start(replacement, (properties) -> { + properties.set(CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK, true); + properties.set(CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS, + TimeUnit.SECONDS.toMillis(10L)); + properties.with("cassandra.broadcast_interval_ms", + Long.toString(TimeUnit.SECONDS.toMillis(30L))); + properties.with("cassandra.ring_delay_ms", + Long.toString(TimeUnit.SECONDS.toMillis(10L))); + // This property tells cassandra that this new instance is replacing the node with + // address remAddress and port remPort + properties.with("cassandra.replace_address_first_boot", remAddress + ":" + remPort); + })); + + awaitLatchOrThrow(nodeStart, 2, TimeUnit.MINUTES, "nodeStart"); newNodes.add(replacement); } return newNodes; diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementMultiDCTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementMultiDCTest.java index 104e8e6..e0cf562 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementMultiDCTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementMultiDCTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import com.google.common.collect.Range; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.extension.ExtendWith; import io.vertx.junit5.VertxExtension; @@ -44,6 +45,7 @@ import org.apache.cassandra.testing.ConfigurableCassandraTestContext; * Multi-DC Host replacement scenario integration tests for token range replica mapping endpoint with the in-jvm * dtest framework. */ +@Tag("heavy") @ExtendWith(VertxExtension.class) class ReplacementMultiDCTest extends ReplacementBaseTest { @@ -179,7 +181,7 @@ class ReplacementMultiDCTest extends ReplacementBaseTest nodeStart.countDown(); // trigger bootstrap start and wait until bootstrap is ready from test transientStateStart.countDown(); - awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES); + awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES, "transientStateEnd"); } orig.call(); } diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementTest.java index 5206353..4115ef8 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementTest.java @@ -161,7 +161,7 @@ class ReplacementTest extends ReplacementBaseTest nodeStart.countDown(); // trigger bootstrap start and wait until bootstrap is ready from test transientStateStart.countDown(); - awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES); + awaitLatchOrTimeout(transientStateEnd, 2, TimeUnit.MINUTES, "transientStateEnd"); } orig.call(); } diff --git a/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java b/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java index 2755aec..7e52d4a 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java +++ b/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java @@ -74,6 +74,9 @@ public abstract class IntegrationTestBase { protected static final String TEST_KEYSPACE = "testkeyspace"; protected static final int DEFAULT_RF = 3; + protected static final String WITH_COMPACTION_DISABLED = " WITH COMPACTION = {\n" + + " 'class': 'SizeTieredCompactionStrategy', \n" + + " 'enabled': 'false' }"; private static final String TEST_TABLE_PREFIX = "testtable"; private static final AtomicInteger TEST_TABLE_ID = new AtomicInteger(0); protected Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -82,10 +85,12 @@ public abstract class IntegrationTestBase protected WebClient client; protected CassandraSidecarTestContext sidecarTestContext; protected Injector injector; + private final List<Throwable> testExceptions = new ArrayList<>(); @BeforeEach void setup(AbstractCassandraTestContext cassandraTestContext, TestInfo testInfo) throws InterruptedException { + testExceptions.clear(); IntegrationTestModule integrationTestModule = new IntegrationTestModule(); System.setProperty("cassandra.testtag", testInfo.getTestClass().get().getCanonicalName()); System.setProperty("suitename", testInfo.getDisplayName() + ": " + cassandraTestContext.version); @@ -240,13 +245,33 @@ public abstract class IntegrationTestBase return tableName; } - protected static void awaitLatchOrTimeout(CountDownLatch latch, long duration, TimeUnit timeUnit) + // similar to awaitLatchOrTimeout, it throws either test exceptions (due to startAsync failures) or timeout exception + protected void awaitLatchOrThrow(CountDownLatch latch, long duration, TimeUnit timeUnit, String latchName) + { + String hint = latchName == null ? "" : '(' + latchName + ')'; + boolean completed = Uninterruptibles.awaitUninterruptibly(latch, duration, timeUnit); + if (completed) + { + return; + } + + throwIfHasTestExceptions(); + throw new AssertionError("Latch " + hint + " times out after " + duration + ' ' + timeUnit.name()); + } + + protected static void awaitLatchOrTimeout(CountDownLatch latch, long duration, TimeUnit timeUnit, String latchName) { + String hint = latchName == null ? "" : '(' + latchName + ')'; assertThat(Uninterruptibles.awaitUninterruptibly(latch, duration, timeUnit)) - .describedAs("Latch times out after " + duration + ' ' + timeUnit.name()) + .describedAs("Latch " + hint + " times out after " + duration + ' ' + timeUnit.name()) .isTrue(); } + protected static void awaitLatchOrTimeout(CountDownLatch latch, long duration, TimeUnit timeUnit) + { + awaitLatchOrTimeout(latch, duration, timeUnit, null); + } + protected Session maybeGetSession() { Session session = sidecarTestContext.session(); @@ -254,6 +279,40 @@ public abstract class IntegrationTestBase return session; } + protected void startAsync(String hints, Runnable runnable) + { + new Thread(() -> { + try + { + runnable.run(); + } + catch (Throwable t) + { + testExceptions.add(new RuntimeException(hints, t)); + } + }).start(); + } + + protected void throwIfHasTestExceptions() + { + if (testExceptions.isEmpty()) + return; + + RuntimeException ex = new RuntimeException("Exceptions from async execution, i.e. IntegrationTestBase#startAsync. See the suppressed exceptions"); + for (Throwable t : testExceptions) + { + ex.addSuppressed(t); + } + throw ex; + } + + protected void completeContextOrThrow(VertxTestContext context) + { + throwIfHasTestExceptions(); + context.completeNow(); + } + + private static QualifiedTableName uniqueTestTableFullName(String tablePrefix) { String unquotedTableName = tablePrefix + TEST_TABLE_ID.getAndIncrement(); @@ -261,12 +320,17 @@ public abstract class IntegrationTestBase new Name(unquotedTableName, Metadata.quoteIfNecessary(unquotedTableName))); } - public List<Path> findChildFile(CassandraSidecarTestContext context, String hostname, String target) + /** + * Note: must disable compaction, otherwise the file tree can be mutated while walking and test becomes flaky + * Append WITH_COMPACTION_DISABLED to the table create statement + */ + public List<Path> findChildFile(CassandraSidecarTestContext context, String hostname, String keyspaceName, String target) { InstanceMetadata instanceConfig = context.instancesConfig().instanceFromHost(hostname); List<String> parentDirectories = instanceConfig.dataDirs(); - return parentDirectories.stream().flatMap(s -> findChildFile(Paths.get(s), target).stream()) + return parentDirectories.stream() + .flatMap(s -> findChildFile(Paths.get(s, keyspaceName), target).stream()) .collect(Collectors.toList()); } diff --git a/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java b/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java index c63809f..259ea1d 100644 --- a/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java +++ b/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java @@ -18,6 +18,8 @@ package org.apache.cassandra.testing; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; import org.slf4j.Logger; @@ -34,6 +36,7 @@ public abstract class AbstractCassandraTestContext implements AutoCloseable private static final Logger LOGGER = LoggerFactory.getLogger(AbstractCassandraTestContext.class); public final SimpleCassandraVersion version; + private final Map<String, String> initialProperties; protected UpgradeableCluster cluster; public CassandraIntegrationTest annotation; @@ -45,13 +48,13 @@ public abstract class AbstractCassandraTestContext implements AutoCloseable this.version = version; this.cluster = cluster; this.annotation = annotation; + this.initialProperties = systemStringProperties(); } public AbstractCassandraTestContext(SimpleCassandraVersion version, CassandraIntegrationTest annotation) { - this.version = version; - this.annotation = annotation; + this(version, null, annotation); } public UpgradeableCluster cluster() @@ -82,11 +85,49 @@ public abstract class AbstractCassandraTestContext implements AutoCloseable throw t; } } + finally + { + LOGGER.info("Restoring system properties"); + restoreSystemProperties(); + } } } + private void restoreSystemProperties() + { + Map<String, String> currentProps = systemStringProperties(); + currentProps.forEach((k, v) -> { + String initialValue = initialProperties.get(k); + if (initialValue == null) + { + System.clearProperty(k); // remove the added property during test + } + else if (!v.equals(initialValue)) + { + System.setProperty(k, initialValue); // restore to the initial value + } + else + { + // property is not changed, do nothing + } + }); + } + public int clusterSize() { return annotation.numDcs() * (annotation.nodesPerDc() + annotation.newNodesPerDc()); } + + // return a copy of the current system string properties + private Map<String, String> systemStringProperties() + { + Map<String, String> props = new HashMap<>(); + System.getProperties().forEach((k, v) -> { + if (k instanceof String && v instanceof String) + { + props.put((String) k, (String) v); + } + }); + return props; + } } diff --git a/src/test/integration/org/apache/cassandra/testing/CassandraTestTemplate.java b/src/test/integration/org/apache/cassandra/testing/CassandraTestTemplate.java index fba4f6a..e14384f 100644 --- a/src/test/integration/org/apache/cassandra/testing/CassandraTestTemplate.java +++ b/src/test/integration/org/apache/cassandra/testing/CassandraTestTemplate.java @@ -19,6 +19,7 @@ package org.apache.cassandra.testing; import java.lang.reflect.AnnotatedElement; +import java.net.BindException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -26,6 +27,7 @@ import java.util.Optional; import java.util.function.Predicate; import java.util.stream.Stream; +import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.Extension; @@ -44,6 +46,7 @@ import org.apache.cassandra.distributed.api.TokenSupplier; import org.apache.cassandra.distributed.impl.AbstractCluster; import org.apache.cassandra.distributed.shared.Versions; import org.apache.cassandra.sidecar.common.utils.Preconditions; +import org.apache.cassandra.utils.Throwables; /** @@ -176,10 +179,13 @@ public class CassandraTestTemplate implements TestTemplateInvocationContextProvi if (annotation.buildCluster()) { UpgradeableCluster cluster; - cluster = clusterBuilder.createWithoutStarting(); if (annotation.startCluster()) { - cluster.startup(); + cluster = retriableStartCluster(clusterBuilder, 3); + } + else + { + cluster = clusterBuilder.createWithoutStarting(); } cassandraTestContext = new CassandraTestContext(versionParsed, cluster, annotation); } @@ -305,6 +311,39 @@ public class CassandraTestTemplate implements TestTemplateInvocationContextProvi } } + public static UpgradeableCluster retriableStartCluster(UpgradeableCluster.Builder builder, int maxAttempts) + { + Throwable lastCuase = null; + for (int i = 0; i < maxAttempts; i++) + { + try + { + return builder.start(); + } + catch (Throwable cause) + { + boolean addressAlreadyInUse = Throwables.anyCauseMatches(cause, CassandraTestTemplate::portNotAvailableToBind); + if (addressAlreadyInUse) + { + LOGGER.warn("Failed to provision cluster due to port collision after {} retries", i, cause); + lastCuase = cause; + } + else + { + throw new RuntimeException("Failed to provision cluster", cause); + } + } + } + + throw new RuntimeException("Failed to providiosn cluster after exhausting all attempts", lastCuase); + } + + private static boolean portNotAvailableToBind(Throwable cause) + { + return (cause instanceof BindException && StringUtils.contains(cause.getMessage(), "Address already in use")) + || StringUtils.contains(cause.getMessage(), "is in use by another process"); + } + static { // Settings to reduce the test setup delay incurred if gossip is enabled diff --git a/src/test/integration/org/apache/cassandra/testing/ConfigurableCassandraTestContext.java b/src/test/integration/org/apache/cassandra/testing/ConfigurableCassandraTestContext.java index b43f3ac..a82f06d 100644 --- a/src/test/integration/org/apache/cassandra/testing/ConfigurableCassandraTestContext.java +++ b/src/test/integration/org/apache/cassandra/testing/ConfigurableCassandraTestContext.java @@ -18,17 +18,9 @@ package org.apache.cassandra.testing; -import java.io.IOException; -import java.net.BindException; import java.util.function.Consumer; -import org.apache.commons.lang3.StringUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.cassandra.distributed.UpgradeableCluster; -import org.apache.cassandra.utils.Throwables; /** * A Cassandra Test Context implementation that allows advanced cluster configuration before cluster creation @@ -36,12 +28,6 @@ import org.apache.cassandra.utils.Throwables; */ public class ConfigurableCassandraTestContext extends AbstractCassandraTestContext { - private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurableCassandraTestContext.class); - - public static final String BUILT_CLUSTER_CANNOT_BE_CONFIGURED_ERROR = - "Cannot configure a cluster after it is built. Please set the buildCluster annotation attribute to false, " - + "and do not call `getCluster` before calling this method."; - private final UpgradeableCluster.Builder builder; public ConfigurableCassandraTestContext(SimpleCassandraVersion version, @@ -52,48 +38,13 @@ public class ConfigurableCassandraTestContext extends AbstractCassandraTestConte this.builder = builder; } - public UpgradeableCluster configureCluster(Consumer<UpgradeableCluster.Builder> configurator) throws IOException + public UpgradeableCluster configureAndStartCluster(Consumer<UpgradeableCluster.Builder> configurator) { - if (cluster != null) - { - throw new IllegalStateException(BUILT_CLUSTER_CANNOT_BE_CONFIGURED_ERROR); - } configurator.accept(builder); - cluster = builder.createWithoutStarting(); + cluster = CassandraTestTemplate.retriableStartCluster(builder, 3); return cluster; } - public UpgradeableCluster configureAndStartCluster(Consumer<UpgradeableCluster.Builder> configurator) - throws IOException - { - RuntimeException exception = null; - for (int i = 0; i < 3; i++) - { - try - { - cluster = null; // make sure cluster is null - cluster = configureCluster(configurator); - cluster.startup(); - return cluster; - } - catch (RuntimeException ret) - { - exception = ret; - boolean addressAlreadyInUse = Throwables.anyCauseMatches(exception, this::portNotAvailableToBind); - if (addressAlreadyInUse) - { - LOGGER.warn("Failed to provision cluster after {} retries", i, exception); - } - else - { - throw exception; - } - - } - } - throw exception; - } - @Override public String toString() { @@ -102,10 +53,4 @@ public class ConfigurableCassandraTestContext extends AbstractCassandraTestConte + ", builder=" + builder + '}'; } - - private boolean portNotAvailableToBind(Throwable ex) - { - return ex instanceof BindException && - StringUtils.contains(ex.getMessage(), "Address already in use"); - } } diff --git a/src/test/java/org/apache/cassandra/sidecar/AssertionUtils.java b/src/test/java/org/apache/cassandra/sidecar/AssertionUtils.java index 932c573..154550f 100644 --- a/src/test/java/org/apache/cassandra/sidecar/AssertionUtils.java +++ b/src/test/java/org/apache/cassandra/sidecar/AssertionUtils.java @@ -41,6 +41,17 @@ public class AssertionUtils * @param assertions assertions */ public static void loopAssert(int timeoutSeconds, Runnable assertions) + { + loopAssert(timeoutSeconds, 100, assertions); + } + + /** + * Run the assertions in a loop until the first success within the timeout. + * Otherwise, it fails with the last assertion failure. + * @param timeoutSeconds timeout + * @param assertions assertions + */ + public static void loopAssert(int timeoutSeconds, int delayMillis, Runnable assertions) { long start = System.nanoTime(); long timeout = TimeUnit.SECONDS.toNanos(timeoutSeconds); @@ -56,7 +67,7 @@ public class AssertionUtils { failure = error; } - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + Uninterruptibles.sleepUninterruptibly(delayMillis, TimeUnit.MILLISECONDS); } // times out if (failure != null) diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java index 60f7647..51ad81e 100644 --- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java @@ -24,9 +24,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collections; import java.util.UUID; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -50,6 +48,7 @@ import org.apache.cassandra.sidecar.exceptions.RestoreJobException; import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException; import org.apache.cassandra.sidecar.server.MainModule; +import static org.apache.cassandra.sidecar.AssertionUtils.loopAssert; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.mock; @@ -85,7 +84,8 @@ class RestoreJobManagerTest manager = new RestoreJobManager(restoreJobConfiguration, instanceMetadata, executorPools, - processor); + processor, + false /* do not trigger the first deletion */); } @AfterEach @@ -213,31 +213,31 @@ class RestoreJobManagerTest } @Test - void testDeleteObsoleteData() throws ExecutionException, InterruptedException, TimeoutException, IOException + void testDeleteObsoleteData() throws IOException { - Path oldJobDir = newDir(RestoreJobUtil.prefixedJobId(UUIDs.startOf(System.currentTimeMillis() + long nowMillis = System.currentTimeMillis(); + Path oldJobDir = newDir(RestoreJobUtil.prefixedJobId(UUIDs.startOf(nowMillis - TimeUnit.DAYS.toMillis(jobRecencyDays) - 1))); createFileInDirectory(oldJobDir, 5); Path olderJobDir - = newDir(RestoreJobUtil.prefixedJobId(UUIDs.startOf(System.currentTimeMillis() + = newDir(RestoreJobUtil.prefixedJobId(UUIDs.startOf(nowMillis - TimeUnit.DAYS.toMillis(jobRecencyDays + 1)))); createFileInDirectory(olderJobDir, 5); - Path newJobDir = newDir(RestoreJobUtil.prefixedJobId(UUIDs.startOf(System.currentTimeMillis()))); + Path newJobDir = newDir(RestoreJobUtil.prefixedJobId(UUIDs.startOf(nowMillis))); createFileInDirectory(newJobDir, 5); - manager.deleteObsoleteDataAsync() - .toCompletionStage() - .toCompletableFuture() - .get(5, TimeUnit.SECONDS); - assertThat(Files.exists(oldJobDir)).describedAs("Should be deleted").isFalse(); - assertThat(Files.exists(olderJobDir)).describedAs("Should be deleted").isFalse(); - assertThat(Files.exists(newJobDir)).describedAs("Should survive").isTrue(); - assertThat(newJobDir.toFile().list()) - .describedAs("Should have 5 files intact") - .hasSize(5); + manager.deleteObsoleteDataAsync(); + loopAssert(3, 10, () -> { + assertThat(Files.exists(oldJobDir)).describedAs("Should be deleted").isFalse(); + assertThat(Files.exists(olderJobDir)).describedAs("Should be deleted").isFalse(); + assertThat(Files.exists(newJobDir)).describedAs("Should survive").isTrue(); + assertThat(newJobDir.toFile().list()) + .describedAs("Should have 5 files intact") + .hasSize(5); + }); } private RestoreSlice getTestSlice() @@ -276,8 +276,10 @@ class RestoreJobManagerTest { for (int i = 0; i < nFiles; i++) { - Files.createFile(Paths.get(path.toString(), UUID.randomUUID().toString())); + Files.createFile(Paths.get(path.toString(), "file" + i)); } - assertThat(path.toFile().list()).hasSize(nFiles); + assertThat(path.toFile().list()) + .describedAs("listing files in " + path.toAbsolutePath()) + .hasSize(nFiles); } } diff --git a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java index 5eb7faa..f533aa7 100644 --- a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java @@ -402,12 +402,24 @@ class SSTableUploadHandlerTest extends BaseUploadsHandlerTest { if (expectTimeout) { - assertThat(response.failed()).isTrue(); - context.completeNow(); + if (response.failed()) + { + context.completeNow(); + } + else + { + context.failNow("Expect timeout, but passed"); + } return; } HttpResponse<Buffer> httpResponse = response.result(); + if (httpResponse == null) + { + context.failNow(new RuntimeException("Failed to get response. Expect status code: " + expectedRetCode, response.cause())); + return; + } + if (httpResponse.statusCode() != expectedRetCode) { context.failNow("Status code mismatched. Expected: " + expectedRetCode + @@ -418,7 +430,7 @@ class SSTableUploadHandlerTest extends BaseUploadsHandlerTest UploadSSTableMetrics.UploadSSTableComponentMetrics componentMetrics = uploadMetrics.forComponent("Data.db"); if (expectedRetCode == HttpResponseStatus.TOO_MANY_REQUESTS.code()) { - assertThat(uploadMetrics.throttled.metric.getValue()).isOne(); + context.verify(() -> assertThat(uploadMetrics.throttled.metric.getValue()).isOne()); } if (responseValidator != null) @@ -438,9 +450,9 @@ class SSTableUploadHandlerTest extends BaseUploadsHandlerTest { Path targetFilePath = Paths.get(SnapshotUtils.makeStagingDir(canonicalTemporaryPath), uploadId, keyspace, tableName, targetFileName); - assertThat(Files.exists(targetFilePath)).isTrue(); try { + assertThat(Files.exists(targetFilePath)).isTrue(); long expectedSize = Files.size(targetFilePath); assertThat(componentMetrics.bytesUploadedRate.metric.getCount()).isEqualTo(expectedSize); assertThat(uploadMetrics.totalBytesUploadedRate.metric.getCount()).isEqualTo(Files.size(targetFilePath)); diff --git a/src/test/resources/logback-sidecar.xml b/src/test/resources/logback-sidecar.xml new file mode 100644 index 0000000..1dd09a0 --- /dev/null +++ b/src/test/resources/logback-sidecar.xml @@ -0,0 +1,36 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration scan="true" scanPeriod="60 seconds"> + <jmxConfigurator /> + + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>DEBUG</level> + </filter> + <encoder> + <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern> + </encoder> + </appender> + + <root level="DEBUG"> + <appender-ref ref="STDOUT" /> + </root> + + <logger name="io.netty.handler.logging.LoggingHandler" level="info" /> +</configuration> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org