This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new 72e38ea6 CASSSIDECAR-204: Improve integration test stability (#188)
72e38ea6 is described below
commit 72e38ea6eb181e6f1e60855b82f38dde13a8a812
Author: Yifan Cai <[email protected]>
AuthorDate: Mon Feb 10 10:10:52 2025 -0800
CASSSIDECAR-204: Improve integration test stability (#188)
Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSSIDECAR-204
---
.circleci/config.yml | 4 +
adapters/base/build.gradle | 1 +
adapters/cassandra41/build.gradle | 1 +
client-common/build.gradle | 1 +
.../common/response/data/StreamsProgressStats.java | 15 ++
client/build.gradle | 1 +
gradle/common/integrationTestTask.gradle | 1 +
.../testing/SharedClusterIntegrationTestBase.java | 13 +-
.../routes/SchemaHandlerIntegrationTest.java | 102 ++++++++++
server-common/build.gradle | 1 +
server/build.gradle | 2 +
.../driver/SidecarLoadBalancingPolicyTest.java | 4 +-
.../sidecar/common/CQLSessionProviderTest.java | 4 +-
.../sidecar/common/DelegateIntegrationTest.java | 87 ++++++---
.../ClusterLeaseClaimTaskIntegrationTest.java | 4 +-
.../cassandra/sidecar/db/SidecarSchemaIntTest.java | 65 -------
...ConnectedClientStatsHandlerIntegrationTest.java | 8 +-
.../routes/SchemaHandlerIntegrationTest.java | 130 -------------
.../sidecar/routes/StreamStatsIntegrationTest.java | 209 +++++++++------------
.../SSTableImportHandlerIntegrationTest.java | 3 +-
.../routes/tokenrange/BasicMultiDCRf3Test.java | 2 +
.../testing/CassandraSidecarTestContext.java | 65 +++++--
.../sidecar/testing/IntegrationTestBase.java | 88 +++++----
.../sidecar/testing/IntegrationTestModule.java | 60 +++++-
.../testing/AbstractCassandraTestContext.java | 22 ++-
.../cassandra/testing/CassandraTestContext.java | 3 +-
.../cassandra/testing/CassandraTestTemplate.java | 20 +-
.../testing/ConfigurableCassandraTestContext.java | 4 +-
server/src/test/resources/logback-in-jvm-dtest.xml | 11 +-
vertx-auth-mtls/build.gradle | 1 +
vertx-client-shaded/build.gradle | 1 +
vertx-client/build.gradle | 1 +
32 files changed, 486 insertions(+), 448 deletions(-)
diff --git a/.circleci/config.yml b/.circleci/config.yml
index 47e509a5..1421a345 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -89,6 +89,10 @@ jobs:
path: build/reports
destination: test-reports
+ - store_artifacts:
+ path: build/test-results/
+ destination: test-results
+
- store_test_results:
path: build/test-results/
diff --git a/adapters/base/build.gradle b/adapters/base/build.gradle
index ad379914..ce80e8e7 100644
--- a/adapters/base/build.gradle
+++ b/adapters/base/build.gradle
@@ -52,6 +52,7 @@ test {
println("Destination directory for adapters-base tests: ${destDir}")
junitXml.getOutputLocation().set(destDir)
html.setRequired(true)
+ html.getOutputLocation().set(destDir)
}
}
diff --git a/adapters/cassandra41/build.gradle
b/adapters/cassandra41/build.gradle
index 190e09ea..bc8f48ce 100644
--- a/adapters/cassandra41/build.gradle
+++ b/adapters/cassandra41/build.gradle
@@ -47,6 +47,7 @@ test {
println("Destination directory for adapters-cassandra41 tests:
${destDir}")
junitXml.getOutputLocation().set(destDir)
html.setRequired(true)
+ html.getOutputLocation().set(destDir)
}
}
diff --git a/client-common/build.gradle b/client-common/build.gradle
index b2bb3cdb..605c5d1a 100644
--- a/client-common/build.gradle
+++ b/client-common/build.gradle
@@ -48,6 +48,7 @@ test {
println("Destination directory for client-common tests: ${destDir}")
junitXml.getOutputLocation().set(destDir)
html.setRequired(true)
+ html.getOutputLocation().set(destDir)
}
}
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/StreamsProgressStats.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/StreamsProgressStats.java
index 88d74077..9b1d990d 100644
---
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/StreamsProgressStats.java
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/StreamsProgressStats.java
@@ -104,4 +104,19 @@ public class StreamsProgressStats
{
return totalBytesSent;
}
+
+ @Override
+ public String toString()
+ {
+ return "StreamsProgressStats{" +
+ "totalFilesToReceive=" + totalFilesToReceive +
+ ", totalFilesReceived=" + totalFilesReceived +
+ ", totalBytesToReceive=" + totalBytesToReceive +
+ ", totalBytesReceived=" + totalBytesReceived +
+ ", totalFilesToSend=" + totalFilesToSend +
+ ", totalFilesSent=" + totalFilesSent +
+ ", totalBytesToSend=" + totalBytesToSend +
+ ", totalBytesSent=" + totalBytesSent +
+ '}';
+ }
}
diff --git a/client/build.gradle b/client/build.gradle
index 194c6b66..1f886038 100644
--- a/client/build.gradle
+++ b/client/build.gradle
@@ -55,6 +55,7 @@ test {
println("Destination directory for client tests: ${destDir}")
junitXml.getOutputLocation().set(destDir)
html.setRequired(true)
+ html.getOutputLocation().set(destDir)
}
}
diff --git a/gradle/common/integrationTestTask.gradle
b/gradle/common/integrationTestTask.gradle
index 38547b61..c0ce425b 100644
--- a/gradle/common/integrationTestTask.gradle
+++ b/gradle/common/integrationTestTask.gradle
@@ -66,6 +66,7 @@ apply from:
"${project.rootDir}/gradle/common/java11Options.gradle"
println("Destination directory for integration tests: ${destDir}")
junitXml.getOutputLocation().set(destDir)
html.setRequired(true)
+ html.getOutputLocation().set(destDir)
}
testLogging {
events "started", "passed", "skipped", "failed"
diff --git
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
index e14977db..09f00803 100644
---
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
+++
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
@@ -156,6 +156,7 @@ public abstract class SharedClusterIntegrationTestBase
protected Server server;
protected TestVersion testVersion;
protected MtlsTestHelper mtlsTestHelper;
+ private final CountDownLatch sidecarSchemaReadyLatch = new
CountDownLatch(1);
private IsolatedDTestClassLoaderWrapper classLoaderWrapper;
private Injector sidecarServerInjector;
@@ -350,6 +351,9 @@ public abstract class SharedClusterIntegrationTestBase
AbstractModule testModule = new IntegrationTestModule(instances,
classLoaderWrapper, mtlsTestHelper,
dnsResolver,
configurationOverrides());
sidecarServerInjector = Guice.createInjector(Modules.override(new
MainModule()).with(testModule));
+ Vertx vertx = sidecarServerInjector.getInstance(Vertx.class);
+ vertx.eventBus()
+
.localConsumer(SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED.address(), msg
-> sidecarSchemaReadyLatch.countDown());
Server sidecarServer = sidecarServerInjector.getInstance(Server.class);
sidecarServer.start()
.onSuccess(s -> context.completeNow())
@@ -362,15 +366,10 @@ public abstract class SharedClusterIntegrationTestBase
protected void waitForSchemaReady(long timeout, TimeUnit timeUnit)
{
assertThat(sidecarServerInjector)
- .describedAs("Sidecar is started")
+ .describedAs("Sidecar should be started")
.isNotNull();
- CountDownLatch latch = new CountDownLatch(1);
- Vertx vertx = sidecarServerInjector.getInstance(Vertx.class);
- vertx.eventBus()
-
.localConsumer(SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED.address(), msg
-> latch.countDown());
-
- assertThat(Uninterruptibles.awaitUninterruptibly(latch, timeout,
timeUnit))
+
assertThat(Uninterruptibles.awaitUninterruptibly(sidecarSchemaReadyLatch,
timeout, timeUnit))
.describedAs("Sidecar schema is not initialized after " + timeout + '
' + timeUnit)
.isTrue();
}
diff --git
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/SchemaHandlerIntegrationTest.java
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/SchemaHandlerIntegrationTest.java
new file mode 100644
index 00000000..c1c1e044
--- /dev/null
+++
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/SchemaHandlerIntegrationTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import org.apache.cassandra.sidecar.common.response.SchemaResponse;
+import
org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase;
+
+import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class SchemaHandlerIntegrationTest extends
SharedClusterSidecarIntegrationTestBase
+{
+ @Override
+ protected void initializeSchemaForTest()
+ {
+ createTestKeyspace("testkeyspace", Map.of("replication_factor", 1));
+ createTestKeyspace("\"Cycling\"", Map.of("replication_factor", 1));
+ createTestKeyspace("\"keyspace\"", Map.of("replication_factor", 1));
+ }
+
+ @Test
+ void testListKeyspaces()
+ {
+ String testRoute = "/api/v1/schema/keyspaces";
+ SchemaResponse response =
getBlocking(trustedClient().get(server.actualPort(), "localhost", testRoute)
+
.expect(ResponsePredicate.SC_OK)
+ .send())
+ .bodyAsJson(SchemaResponse.class);
+ assertThat(response).isNotNull();
+ assertThat(response.keyspace()).isNull();
+ assertThat(response.schema()).isNotNull();
+ }
+
+ @Test
+ void testSchemaHandlerKeyspaceDoesNotExist()
+ {
+ String testRoute = "/api/v1/schema/keyspaces/non_existent";
+ getBlocking(trustedClient().get(server.actualPort(), "localhost",
testRoute)
+ .expect(ResponsePredicate.SC_NOT_FOUND)
+ .send());
+ }
+
+ @Test
+ void testSchemaHandlerWithKeyspace()
+ {
+ String testRoute = "/api/v1/schema/keyspaces/testkeyspace";
+ SchemaResponse response =
getBlocking(trustedClient().get(server.actualPort(), "localhost", testRoute)
+
.expect(ResponsePredicate.SC_OK)
+ .send())
+ .bodyAsJson(SchemaResponse.class);
+ assertThat(response).isNotNull();
+ assertThat(response.keyspace()).isEqualTo("testkeyspace");
+ assertThat(response.schema()).isNotNull();
+ }
+
+ @Test
+ void testSchemaHandlerWithCaseSensitiveKeyspace()
+ {
+ String testRoute = "/api/v1/schema/keyspaces/\"Cycling\"";
+ SchemaResponse response =
getBlocking(trustedClient().get(server.actualPort(), "localhost", testRoute)
+
.expect(ResponsePredicate.SC_OK)
+ .send())
+ .bodyAsJson(SchemaResponse.class);
+ assertThat(response).isNotNull();
+ assertThat(response.keyspace()).isEqualTo("Cycling");
+ assertThat(response.schema()).isNotNull();
+ }
+
+ @Test
+ void testSchemaHandlerWithReservedKeywordKeyspace()
+ {
+ String testRoute = "/api/v1/schema/keyspaces/\"keyspace\"";
+ SchemaResponse response =
getBlocking(trustedClient().get(server.actualPort(), "localhost", testRoute)
+
.expect(ResponsePredicate.SC_OK)
+ .send())
+ .bodyAsJson(SchemaResponse.class);
+ assertThat(response).isNotNull();
+ assertThat(response.keyspace()).isEqualTo("keyspace");
+ assertThat(response.schema()).isNotNull();
+ }
+}
diff --git a/server-common/build.gradle b/server-common/build.gradle
index 755b3dd1..1d900abf 100644
--- a/server-common/build.gradle
+++ b/server-common/build.gradle
@@ -48,6 +48,7 @@ test {
println("Destination directory for server-common tests: ${destDir}")
junitXml.getOutputLocation().set(destDir)
html.setRequired(true)
+ html.getOutputLocation().set(destDir)
}
}
diff --git a/server/build.gradle b/server/build.gradle
index 54c4c7bb..2cff67d0 100644
--- a/server/build.gradle
+++ b/server/build.gradle
@@ -205,6 +205,7 @@ test {
println("Destination directory for unit tests: ${destDir}")
junitXml.getOutputLocation().set(destDir)
html.setRequired(true)
+ html.getOutputLocation().set(destDir)
}
testLogging {
events "passed", "skipped", "failed"
@@ -230,6 +231,7 @@ tasks.register("containerTest", Test) {
println("Destination directory for testcontainer tests: ${destDir}")
junitXml.getOutputLocation().set(destDir)
html.setRequired(true)
+ html.getOutputLocation().set(destDir)
}
testLogging {
events "passed", "skipped", "failed"
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/cluster/driver/SidecarLoadBalancingPolicyTest.java
b/server/src/test/integration/org/apache/cassandra/sidecar/cluster/driver/SidecarLoadBalancingPolicyTest.java
index 8652c48d..0d94c30d 100644
---
a/server/src/test/integration/org/apache/cassandra/sidecar/cluster/driver/SidecarLoadBalancingPolicyTest.java
+++
b/server/src/test/integration/org/apache/cassandra/sidecar/cluster/driver/SidecarLoadBalancingPolicyTest.java
@@ -55,9 +55,9 @@ public class SidecarLoadBalancingPolicyTest extends
IntegrationTestBase
}
@Override
- protected int getNumInstancesToManage(int clusterSize)
+ protected int[] getInstancesToManage(int clusterSize)
{
- return SIDECAR_MANAGED_INSTANCES; // we only want to manage the first
2 instances in the "cluster"
+ return new int[] {1, 2}; // we only want to manage the first 2
instances in the "cluster"
}
@CassandraIntegrationTest(nodesPerDc = 6)
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java
b/server/src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java
index cf361d69..be12ac60 100644
---
a/server/src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java
+++
b/server/src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java
@@ -50,9 +50,9 @@ public class CQLSessionProviderTest extends
IntegrationTestBase
public static final String KEYSPACE_FAILED_RESPONSE_START =
"{\"status\":\"Service Unavailable\",";
@Override
- protected int getNumInstancesToManage(int clusterSize)
+ protected int[] getInstancesToManage(int clusterSize)
{
- return 2;
+ return new int[] {1, 2};
}
@CassandraIntegrationTest(nodesPerDc = 2, startCluster = false)
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/common/DelegateIntegrationTest.java
b/server/src/test/integration/org/apache/cassandra/sidecar/common/DelegateIntegrationTest.java
index 94426fec..3bada1c4 100644
---
a/server/src/test/integration/org/apache/cassandra/sidecar/common/DelegateIntegrationTest.java
+++
b/server/src/test/integration/org/apache/cassandra/sidecar/common/DelegateIntegrationTest.java
@@ -21,9 +21,11 @@ package org.apache.cassandra.sidecar.common;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import com.google.common.collect.ImmutableSet;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import io.vertx.core.AsyncResult;
@@ -58,6 +60,7 @@ import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSAND
import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_READY;
import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_JMX_DISCONNECTED;
import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_JMX_READY;
+import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert;
import static org.assertj.core.api.Assertions.assertThat;
/**
@@ -66,6 +69,20 @@ import static org.assertj.core.api.Assertions.assertThat;
@ExtendWith(VertxExtension.class)
class DelegateIntegrationTest extends IntegrationTestBase
{
+ private final AtomicReference<Message<JsonObject>>
allCassandraCqlReadyMessage = new AtomicReference<>();
+
+ @BeforeEach
+ void reset()
+ {
+ allCassandraCqlReadyMessage.set(null);
+ }
+
+ @Override
+ protected void beforeServerStart()
+ {
+ vertx.eventBus().localConsumer(ON_ALL_CASSANDRA_CQL_READY.address(),
allCassandraCqlReadyMessage::set);
+ }
+
@CassandraIntegrationTest()
void testCorrectVersionIsEnabled()
{
@@ -102,8 +119,10 @@ class DelegateIntegrationTest extends IntegrationTestBase
.instanceFromId(instanceId)
.delegate();
- assertThat(delegate).isNotNull();
- assertThat(delegate.isNativeUp()).as("health check fails after
binary has been disabled").isFalse();
+ context.verify(() -> {
+ assertThat(delegate).isNotNull();
+ assertThat(delegate.isNativeUp()).as("health check fails after
binary has been disabled").isFalse();
+ });
cqlDisconnected.flag();
sidecarTestContext.cluster().get(1).nodetool("enablebinary");
});
@@ -113,9 +132,12 @@ class DelegateIntegrationTest extends IntegrationTestBase
CassandraAdapterDelegate delegate =
sidecarTestContext.instancesMetadata()
.instanceFromId(instanceId)
.delegate();
- assertThat(delegate).isNotNull();
- assertThat(delegate.isNativeUp()).as("health check succeeds after
binary has been enabled")
- .isTrue();
+ context.verify(() -> {
+ assertThat(delegate).isNotNull();
+ assertThat(delegate.isNativeUp()).as("health check succeeds
after binary has been enabled")
+ .isTrue();
+ });
+
cqlReady.flag();
});
@@ -130,19 +152,16 @@ class DelegateIntegrationTest extends IntegrationTestBase
}
@CassandraIntegrationTest(nodesPerDc = 3)
- void testAllInstancesHealthCheck(VertxTestContext context)
+ void testAllInstancesHealthCheck()
{
- EventBus eventBus = vertx.eventBus();
- Checkpoint allCqlReady = context.checkpoint();
-
Set<Integer> expectedCassandraInstanceIds = ImmutableSet.of(1, 2, 3);
- eventBus.localConsumer(ON_ALL_CASSANDRA_CQL_READY.address(),
(Message<JsonObject> message) -> {
+ loopAssert(30, 1000, () -> {
+ Message<JsonObject> message = allCassandraCqlReadyMessage.get();
+ assertThat(message).isNotNull();
JsonArray cassandraInstanceIds =
message.body().getJsonArray("cassandraInstanceIds");
assertThat(cassandraInstanceIds).hasSize(3);
assertThat(IntStream.rangeClosed(1, cassandraInstanceIds.size()))
.allMatch(expectedCassandraInstanceIds::contains);
-
- allCqlReady.flag();
});
}
@@ -155,31 +174,37 @@ class DelegateIntegrationTest extends IntegrationTestBase
Checkpoint jmxDisconnected = context.checkpoint();
Set<Integer> expectedCassandraInstanceIds = ImmutableSet.of(1, 2, 3);
- eventBus.localConsumer(ON_ALL_CASSANDRA_CQL_READY.address(),
(Message<JsonObject> message) -> {
+ eventBus.localConsumer(ON_CASSANDRA_CQL_DISCONNECTED.address(),
(Message<JsonObject> message) -> {
+ context.verify(() -> {
+ Integer instanceId =
message.body().getInteger("cassandraInstanceId");
+ assertThat(instanceId).isEqualTo(2);
+
+ buildNativeHealthRequest(client,
instanceId).send(assertHealthCheckNotOk(context, cqlDisconnected));
+ });
+ });
+
+ eventBus.localConsumer(ON_CASSANDRA_JMX_DISCONNECTED.address(),
(Message<JsonObject> message) -> {
+ context.verify(() -> {
+ Integer instanceId =
message.body().getInteger("cassandraInstanceId");
+ assertThat(instanceId).isEqualTo(2);
+
+ buildJmxHealthRequest(client,
instanceId).send(assertHealthCheckNotOk(context, jmxDisconnected));
+ });
+ });
+
+ loopAssert(30, 1000, () -> {
+ Message<JsonObject> message = allCassandraCqlReadyMessage.get();
+ assertThat(message).isNotNull();
JsonArray cassandraInstanceIds =
message.body().getJsonArray("cassandraInstanceIds");
assertThat(cassandraInstanceIds).hasSize(3);
assertThat(IntStream.rangeClosed(1, cassandraInstanceIds.size()))
.allMatch(expectedCassandraInstanceIds::contains);
allCqlReady.flag();
-
// Stop instance 2
ClusterUtils.stopUnchecked(sidecarTestContext.cluster().get(2));
});
- eventBus.localConsumer(ON_CASSANDRA_CQL_DISCONNECTED.address(),
(Message<JsonObject> message) -> {
- Integer instanceId =
message.body().getInteger("cassandraInstanceId");
- assertThat(instanceId).isEqualTo(2);
-
- buildNativeHealthRequest(client,
instanceId).send(assertHealthCheckNotOk(context, cqlDisconnected));
- });
-
- eventBus.localConsumer(ON_CASSANDRA_JMX_DISCONNECTED.address(),
(Message<JsonObject> message) -> {
- Integer instanceId =
message.body().getInteger("cassandraInstanceId");
- assertThat(instanceId).isEqualTo(2);
-
- buildJmxHealthRequest(client,
instanceId).send(assertHealthCheckNotOk(context, jmxDisconnected));
- });
}
@Timeout(value = 2, timeUnit = TimeUnit.MINUTES)
@@ -187,7 +212,7 @@ class DelegateIntegrationTest extends IntegrationTestBase
public void testChangingClusterSize(VertxTestContext context) throws
InterruptedException
{
// assume the sidecar has 3 managed instances, even though the cluster
only starts with 2 instances initially
- sidecarTestContext.setNumInstancesToManage(3);
+ sidecarTestContext.setInstancesToManage(1, 2, 3);
EventBus eventBus = vertx.eventBus();
@@ -246,7 +271,7 @@ class DelegateIntegrationTest extends IntegrationTestBase
}
else if (upInstanceCount == 3)
{
- assertThat(jmxConnectedInstances).containsExactly(1, 2, 3);
+ context.verify(() ->
assertThat(jmxConnectedInstances).containsExactly(1, 2, 3));
}
}
@@ -258,14 +283,14 @@ class DelegateIntegrationTest extends IntegrationTestBase
int upInstanceCount = nativeConnectedInstances.size();
if (upInstanceCount == 2)
{
- assertThat(nativeConnectedInstances).containsExactly(1, 2);
+ context.verify(() ->
assertThat(nativeConnectedInstances).containsExactly(1, 2));
buildNativeHealthRequest(client,
3).send(assertHealthCheckNotOk(context, notOkCheckpoint));
logger.info("DBG: First two instances connected via native, third
is down");
firstTwoConnected.countDown();
}
else if (upInstanceCount == 3)
{
- assertThat(nativeConnectedInstances).containsExactly(1, 2, 3);
+ context.verify(() ->
assertThat(nativeConnectedInstances).containsExactly(1, 2, 3));
}
}
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/coordination/ClusterLeaseClaimTaskIntegrationTest.java
b/server/src/test/integration/org/apache/cassandra/sidecar/coordination/ClusterLeaseClaimTaskIntegrationTest.java
index edc7d15b..71fe6b28 100644
---
a/server/src/test/integration/org/apache/cassandra/sidecar/coordination/ClusterLeaseClaimTaskIntegrationTest.java
+++
b/server/src/test/integration/org/apache/cassandra/sidecar/coordination/ClusterLeaseClaimTaskIntegrationTest.java
@@ -436,7 +436,7 @@ class ClusterLeaseClaimTaskIntegrationTest
SimpleQueryResult rows = cluster.getFirstRunningInstance()
.coordinator()
.executeWithResult("SELECT * FROM
sidecar_internal.sidecar_lease_v1 ALLOW FILTERING",
-
ConsistencyLevel.LOCAL_QUORUM);
+
ConsistencyLevel.SERIAL);
return StreamSupport.stream(rows.spliterator(), false).count();
}
@@ -446,7 +446,7 @@ class ClusterLeaseClaimTaskIntegrationTest
cluster.getFirstRunningInstance()
.coordinator()
.execute("SELECT writetime(owner), owner FROM
sidecar_internal.sidecar_lease_v1 WHERE name = 'cluster_lease_holder'",
- ConsistencyLevel.LOCAL_QUORUM);
+ ConsistencyLevel.SERIAL);
assertThat(result).isNotNull();
assertThat(result).hasDimensions(1, 2);
return result;
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/db/SidecarSchemaIntTest.java
b/server/src/test/integration/org/apache/cassandra/sidecar/db/SidecarSchemaIntTest.java
index d2e79f42..98e7fb99 100644
---
a/server/src/test/integration/org/apache/cassandra/sidecar/db/SidecarSchemaIntTest.java
+++
b/server/src/test/integration/org/apache/cassandra/sidecar/db/SidecarSchemaIntTest.java
@@ -20,22 +20,8 @@ package org.apache.cassandra.sidecar.db;
import java.util.concurrent.TimeUnit;
-import com.google.inject.AbstractModule;
-import com.google.inject.Provides;
-import com.google.inject.Singleton;
-import io.vertx.core.Vertx;
-import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
-import
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
-import org.apache.cassandra.sidecar.config.CoordinationConfiguration;
-import org.apache.cassandra.sidecar.config.PeriodicTaskConfiguration;
-import org.apache.cassandra.sidecar.config.ServiceConfiguration;
-import org.apache.cassandra.sidecar.config.yaml.CoordinationConfigurationImpl;
-import org.apache.cassandra.sidecar.config.yaml.PeriodicTaskConfigurationImpl;
import org.apache.cassandra.sidecar.coordination.ClusterLease;
-import org.apache.cassandra.sidecar.coordination.ClusterLeaseClaimTask;
-import org.apache.cassandra.sidecar.coordination.ElectorateMembership;
import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
-import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
import org.apache.cassandra.testing.CassandraIntegrationTest;
@@ -43,57 +29,6 @@ import static org.assertj.core.api.Assertions.assertThat;
class SidecarSchemaIntTest extends IntegrationTestBase
{
- @Override
- protected void beforeSetup()
- {
- installTestSpecificModule(new AbstractModule()
- {
- @Provides
- @Singleton
- public ClusterLease clusterLease()
- {
- // start with INDETERMINATE to compete for a leaseholder
first, then init schema
- return new ClusterLease(ClusterLease.Ownership.INDETERMINATE);
- }
-
- @Provides
- @Singleton
- public CoordinationConfiguration
clusterLeaseClaimTaskConfiguration()
- {
- // increase the claim frequency
- PeriodicTaskConfiguration taskConfig = new
PeriodicTaskConfigurationImpl(true,
-
MillisecondBoundConfiguration.parse("1s"),
-
MillisecondBoundConfiguration.parse("1s"));
- return new CoordinationConfigurationImpl(taskConfig);
- }
-
- @Provides
- @Singleton
- public ClusterLeaseClaimTask clusterLeaseClaimTask(Vertx vertx,
-
ServiceConfiguration serviceConfiguration,
-
ElectorateMembership electorateMembership,
-
SidecarLeaseDatabaseAccessor accessor,
- ClusterLease
clusterLease,
- SidecarMetrics
metrics)
- {
- return new ClusterLeaseClaimTask(vertx,
- serviceConfiguration,
- electorateMembership,
- accessor,
- clusterLease,
- metrics)
- {
- @Override
- public DurationSpec delay()
- {
- // ignore the minimum delay check that is coded in
ClusterLeaseClaimTask
- return MillisecondBoundConfiguration.parse("1s");
- }
- };
- }
- });
- }
-
@CassandraIntegrationTest
void testSidecarSchemaInitializationFromBlank()
{
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandlerIntegrationTest.java
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandlerIntegrationTest.java
index 0a592bf6..b2ba2b1a 100644
---
a/server/src/test/integration/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandlerIntegrationTest.java
+++
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandlerIntegrationTest.java
@@ -88,6 +88,9 @@ public class ConnectedClientStatsHandlerIntegrationTest
extends IntegrationTestB
createTestKeyspace();
Session session = maybeGetSession();
session.execute("USE " + TEST_KEYSPACE);
+ // create an additional pair of connections
+ sidecarTestContext.buildNewCqlSessionProvider()
+ .get();
Map<String, Boolean> expectedParams =
Collections.singletonMap("summary", false);
String testRoute =
"/api/v1/cassandra/stats/connected-clients?summary=false";
@@ -105,8 +108,9 @@ public class ConnectedClientStatsHandlerIntegrationTest
extends IntegrationTestB
void retrieveClientStatsMultipleConnections(VertxTestContext context)
throws Exception
{
- // Creates an additional connection pair
- createTestKeyspace();
+ // create an additional pair of connections
+ sidecarTestContext.buildNewCqlSessionProvider()
+ .get();
Map<String, Boolean> expectedParams =
Collections.singletonMap("summary", false);
String testRoute =
"/api/v1/cassandra/stats/connected-clients?summary=false";
testWithClient(context, client -> {
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/routes/SchemaHandlerIntegrationTest.java
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/SchemaHandlerIntegrationTest.java
deleted file mode 100644
index e985383a..00000000
---
a/server/src/test/integration/org/apache/cassandra/sidecar/routes/SchemaHandlerIntegrationTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.sidecar.routes;
-
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import com.datastax.driver.core.Session;
-import io.vertx.ext.web.client.predicate.ResponsePredicate;
-import io.vertx.junit5.VertxExtension;
-import io.vertx.junit5.VertxTestContext;
-import org.apache.cassandra.sidecar.common.response.SchemaResponse;
-import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
-import org.apache.cassandra.testing.CassandraIntegrationTest;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/**
- * Integration tests for the {@link SchemaHandler}
- */
-@ExtendWith(VertxExtension.class)
-class SchemaHandlerIntegrationTest extends IntegrationTestBase
-{
- @CassandraIntegrationTest
- void schemaHandlerNoKeyspace(VertxTestContext context) throws Exception
- {
- String testRoute = "/api/v1/schema/keyspaces";
- testWithClient(context, client -> {
- client.get(server.actualPort(), "127.0.0.1", testRoute)
- .expect(ResponsePredicate.SC_OK)
- .send(context.succeeding(response -> {
- SchemaResponse schemaResponse =
response.bodyAsJson(SchemaResponse.class);
- assertThat(schemaResponse).isNotNull();
- assertThat(schemaResponse.keyspace()).isNull();
- assertThat(schemaResponse.schema()).isNotNull();
- context.completeNow();
- }));
- });
- }
-
- @CassandraIntegrationTest
- void schemaHandlerKeyspaceDoesNotExist(VertxTestContext context) throws
Exception
- {
- String testRoute = "/api/v1/schema/keyspaces/non_existent";
- testWithClient(context, client -> {
- client.get(server.actualPort(), "127.0.0.1", testRoute)
- .expect(ResponsePredicate.SC_NOT_FOUND)
- .send(context.succeedingThenComplete());
- });
- }
-
- @CassandraIntegrationTest
- void schemaHandlerWithKeyspace(VertxTestContext context) throws Exception
- {
- createTestKeyspace();
-
- String testRoute = "/api/v1/schema/keyspaces/testkeyspace";
- testWithClient(context, client -> {
- client.get(server.actualPort(), "127.0.0.1", testRoute)
- .expect(ResponsePredicate.SC_OK)
- .send(context.succeeding(response -> {
- SchemaResponse schemaResponse =
response.bodyAsJson(SchemaResponse.class);
- assertThat(schemaResponse).isNotNull();
-
assertThat(schemaResponse.keyspace()).isEqualTo("testkeyspace");
- assertThat(schemaResponse.schema()).isNotNull();
- context.completeNow();
- }));
- });
- }
-
- @CassandraIntegrationTest
- void schemaHandlerWithCaseSensitiveKeyspace(VertxTestContext context)
throws Exception
- {
- try (Session session = maybeGetSession())
- {
- session.execute("CREATE KEYSPACE \"Cycling\"" +
- " WITH REPLICATION = { 'class' :
'NetworkTopologyStrategy', 'replication_factor' : 1 };");
- }
-
- String testRoute = "/api/v1/schema/keyspaces/\"Cycling\"";
- testWithClient(context, client -> {
- client.get(server.actualPort(), "127.0.0.1", testRoute)
- .expect(ResponsePredicate.SC_OK)
- .send(context.succeeding(response -> {
- SchemaResponse schemaResponse =
response.bodyAsJson(SchemaResponse.class);
- assertThat(schemaResponse).isNotNull();
-
assertThat(schemaResponse.keyspace()).isEqualTo("Cycling");
- assertThat(schemaResponse.schema()).isNotNull();
- context.completeNow();
- }));
- });
- }
-
- @CassandraIntegrationTest
- void schemaHandlerWithReservedKeywordKeyspace(VertxTestContext context)
throws Exception
- {
- try (Session session = maybeGetSession())
- {
- session.execute("CREATE KEYSPACE \"keyspace\"" +
- " WITH REPLICATION = { 'class' :
'NetworkTopologyStrategy', 'replication_factor' : 1 };");
- }
-
- String testRoute = "/api/v1/schema/keyspaces/\"keyspace\"";
- testWithClient(context, client -> {
- client.get(server.actualPort(), "127.0.0.1", testRoute)
- .expect(ResponsePredicate.SC_OK)
- .send(context.succeeding(response -> {
- SchemaResponse schemaResponse =
response.bodyAsJson(SchemaResponse.class);
- assertThat(schemaResponse).isNotNull();
-
assertThat(schemaResponse.keyspace()).isEqualTo("keyspace");
- assertThat(schemaResponse.schema()).isNotNull();
- context.completeNow();
- }));
- });
- }
-}
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java
index 6356ab3f..87d95ed0 100644
---
a/server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java
+++
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java
@@ -18,40 +18,26 @@
package org.apache.cassandra.sidecar.routes;
-import java.util.concurrent.Callable;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import com.google.common.util.concurrent.Uninterruptibles;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import com.datastax.driver.core.Session;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.vertx.core.buffer.Buffer;
-import io.vertx.ext.web.client.HttpResponse;
-import io.vertx.junit5.VertxExtension;
-import io.vertx.junit5.VertxTestContext;
-import net.bytebuddy.ByteBuddy;
-import net.bytebuddy.description.type.TypeDescription;
-import net.bytebuddy.dynamic.ClassFileLocator;
-import net.bytebuddy.dynamic.TypeResolutionStrategy;
-import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
-import net.bytebuddy.implementation.MethodDelegation;
-import net.bytebuddy.implementation.bind.annotation.SuperCall;
-import net.bytebuddy.pool.TypePool;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.IUpgradeableInstance;
-import org.apache.cassandra.distributed.shared.ClusterUtils;
import org.apache.cassandra.sidecar.common.response.StreamStatsResponse;
import org.apache.cassandra.sidecar.common.response.data.StreamsProgressStats;
import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
-import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.testing.CassandraIntegrationTest;
-import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+import org.apache.cassandra.testing.CassandraTestContext;
-import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking;
import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert;
import static org.assertj.core.api.Assertions.assertThat;
@@ -59,74 +45,16 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests the stream stats endpoint with cassandra container.
*/
-@ExtendWith(VertxExtension.class)
public class StreamStatsIntegrationTest extends IntegrationTestBase
{
- @CassandraIntegrationTest(numDataDirsPerInstance = 4, nodesPerDc = 2,
network = true, buildCluster = false)
- void streamStatsTest(VertxTestContext context,
ConfigurableCassandraTestContext cassandraTestContext) throws Exception
- {
- BBHelperDecommissioningNode.reset();
- UpgradeableCluster cluster =
cassandraTestContext.configureAndStartCluster(
- builder ->
builder.withInstanceInitializer(BBHelperDecommissioningNode::install));
- IUpgradeableInstance node = cluster.get(2);
-
- createTestKeyspace();
- createTestTableAndPopulate();
-
- startAsync("Decommission node" + node.config().num(),
- () -> node.nodetoolResult("decommission",
"--force").asserts().success());
- AtomicBoolean hasStats = new AtomicBoolean(false);
- AtomicBoolean dataReceived = new AtomicBoolean(false);
-
- // Wait until nodes have reached expected state
- awaitLatchOrThrow(BBHelperDecommissioningNode.transientStateStart, 2,
TimeUnit.MINUTES, "transientStateStart");
-
- // optimal no. of attempts to poll for stats to capture streaming
stats during node decommissioning
- loopAssert(15, 200, () -> {
- StreamsProgressStats progressStats = streamStats(hasStats,
dataReceived);
- assertThat(hasStats).isTrue();
- assertThat(dataReceived)
- .describedAs("Stream Progress Stats - totalFilesReceived:" +
progressStats.totalFilesReceived() +
- " totalBytesReceived:" +
progressStats.totalBytesReceived())
- .isTrue();
- });
- ClusterUtils.awaitGossipStatus(node, node, "LEFT");
- BBHelperDecommissioningNode.transientStateEnd.countDown();
-
- context.completeNow();
- context.awaitCompletion(2, TimeUnit.MINUTES);
- }
-
- private StreamsProgressStats streamStats(AtomicBoolean hasStats,
AtomicBoolean dataReceived)
- {
- String testRoute = "/api/v1/cassandra/stats/streams";
- HttpResponse<Buffer> resp;
- resp = getBlocking(client.get(server.actualPort(), "127.0.0.1",
testRoute)
- .send());
- return assertStreamStatsResponseOK(resp, hasStats, dataReceived);
- }
+ private static final Logger LOGGER =
LoggerFactory.getLogger(StreamStatsIntegrationTest.class);
- StreamsProgressStats assertStreamStatsResponseOK(HttpResponse<Buffer>
response, AtomicBoolean hasStats, AtomicBoolean dataReceived)
+ @CassandraIntegrationTest(nodesPerDc = 2, network = true)
+ void streamStatsTest(CassandraTestContext cassandraTestContext)
{
-
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
- StreamStatsResponse streamStatsResponse =
response.bodyAsJson(StreamStatsResponse.class);
- assertThat(streamStatsResponse).isNotNull();
- StreamsProgressStats streamProgress =
streamStatsResponse.streamsProgressStats();
- assertThat(streamProgress).isNotNull();
- if (streamProgress.totalFilesToReceive() > 0)
- {
- hasStats.set(true);
- if (streamProgress.totalFilesReceived() > 0)
- {
- dataReceived.set(true);
-
assertThat(streamProgress.totalBytesReceived()).isGreaterThan(0);
- }
- }
- return streamProgress;
- }
+ UpgradeableCluster cluster = cassandraTestContext.cluster();
- QualifiedTableName createTestTableAndPopulate()
- {
+ createTestKeyspace(Map.of("datacenter1", 2));
QualifiedTableName tableName = createTestTable(
"CREATE TABLE %s ( \n" +
" race_year int, \n" +
@@ -135,54 +63,97 @@ public class StreamStatsIntegrationTest extends
IntegrationTestBase
" rank int, \n" +
" PRIMARY KEY ((race_year, race_name), rank) \n" +
");");
- Session session = maybeGetSession();
+ // craft inconsistency for repair
+ populateDataAtNode2Only(cluster, tableName);
+
+ // Poll stream stats while repair is running in the background.
+ CountDownLatch testStart = new CountDownLatch(1);
+ IUpgradeableInstance node = cluster.get(1);
+ AtomicReference<RuntimeException> nodetoolError = new
AtomicReference<>();
+ startRepairAsync(node, testStart, tableName, nodetoolError);
+
+ TestState testState = new TestState();
+ testStart.countDown();
+ loopAssert(10, 500, () -> {
+ if (nodetoolError.get() != null)
+ {
+ throw nodetoolError.get();
+ }
+ streamStats(testState);
+ testState.assertCompletion();
+ });
+ }
- session.execute("CREATE INDEX ryear ON " + tableName + "
(race_year);");
+ private void startRepairAsync(IUpgradeableInstance node, CountDownLatch
testStart, QualifiedTableName tableName, AtomicReference<RuntimeException>
nodetoolError)
+ {
+ startAsync("Repairing node" + node.config().num(),
+ () -> {
+ Uninterruptibles.awaitUninterruptibly(testStart);
+ try
+ {
+ node.nodetoolResult("repair", tableName.keyspace(),
tableName.tableName(), "--full").asserts().success();
+ }
+ catch (Throwable cause)
+ {
+ nodetoolError.set(new RuntimeException("Nodetool
failed", cause));
+ }
+ });
+ }
- for (int i = 1; i <= 1000; i++)
- {
- session.execute("INSERT INTO " + tableName + " (race_year,
race_name, rank, cyclist_name) " +
- "VALUES (2015, 'Tour of Japan - Stage 4 - Minami >
Shinshu', " + i + ", 'Benjamin PRADES');");
- }
- return tableName;
+ private void streamStats(TestState testState)
+ {
+ String testRoute = "/api/v1/cassandra/stats/streams";
+ StreamStatsResponse streamStatsResponse =
getBlocking(client.get(server.actualPort(), "127.0.0.1", testRoute)
+
.expect(ResponsePredicate.SC_OK)
+ .send())
+
.bodyAsJson(StreamStatsResponse.class);
+ assertThat(streamStatsResponse).isNotNull();
+ StreamsProgressStats streamProgress =
streamStatsResponse.streamsProgressStats();
+ assertThat(streamProgress).isNotNull();
+ LOGGER.info("Fetched {}", streamProgress);
+ testState.update(streamProgress);
}
- /**
- * ByteBuddy Helper for decommissioning node
- */
- public static class BBHelperDecommissioningNode
+ static class TestState
{
- static CountDownLatch transientStateStart = new CountDownLatch(1);
- static CountDownLatch transientStateEnd = new CountDownLatch(1);
+ StreamsProgressStats lastStats;
+ boolean streamStarted = false, streamCompleted = false;
- public static void install(ClassLoader cl, Integer nodeNumber)
+ void update(StreamsProgressStats streamProgress)
{
- if (nodeNumber == 2)
+ lastStats = streamProgress;
+ if (streamProgress.totalFilesToReceive() > 0)
+ {
+ streamStarted = true;
+ }
+
+ if (streamStarted && streamProgress.totalFilesReceived() ==
streamProgress.totalFilesToReceive())
{
- TypePool typePool = TypePool.Default.of(cl);
- TypeDescription description =
typePool.describe("org.apache.cassandra.streaming.StreamCoordinator")
- .resolve();
- new ByteBuddy().rebase(description,
ClassFileLocator.ForClassLoader.of(cl))
- .method(named("connectAllStreamSessions"))
-
.intercept(MethodDelegation.to(BBHelperDecommissioningNode.class))
- // Defer class loading until all dependencies
are loaded
- .make(TypeResolutionStrategy.Lazy.INSTANCE,
typePool)
- .load(cl,
ClassLoadingStrategy.Default.INJECTION);
+ streamCompleted = true;
}
}
- @SuppressWarnings("unused")
- public static void connectAllStreamSessions(@SuperCall
Callable<StreamOperation> orig) throws Exception
+ void assertCompletion()
{
- transientStateStart.countDown();
- Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
- orig.call();
+ assertThat(streamStarted)
+ .describedAs("Expecting to have non-empty stream stats. last
stats: " + lastStats)
+ .isTrue();
+ assertThat(streamCompleted)
+ .describedAs("Expecting to complete. last stats: " + lastStats)
+ .isTrue();
}
+ }
- public static void reset()
+ void populateDataAtNode2Only(UpgradeableCluster cluster,
QualifiedTableName tableName)
+ {
+ IInstance node = cluster.get(2);
+ // disable compaction for the table to have more file to stream
+ node.nodetoolResult("disableautocompaction", tableName.keyspace(),
tableName.tableName()).asserts().success();
+ for (int i = 1; i <= 100; i++)
{
- transientStateStart = new CountDownLatch(1);
- transientStateEnd = new CountDownLatch(1);
+ node.executeInternal("INSERT INTO " + tableName + " (race_year,
race_name, rank, cyclist_name) " +
+ "VALUES (2015, 'Tour of Japan - Stage 4 -
Minami > Shinshu', " + i + ", 'Benjamin PRADES');");
+ node.flush(TEST_KEYSPACE);
}
}
}
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerIntegrationTest.java
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerIntegrationTest.java
index 21d46ce3..5b43c82f 100644
---
a/server/src/test/integration/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerIntegrationTest.java
+++
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerIntegrationTest.java
@@ -75,9 +75,8 @@ public class SSTableImportHandlerIntegrationTest extends
IntegrationTestBase
// Truncate the table, insert more data.
// Test the import SSTable endpoint by importing data that was
originally truncated.
// Verify by querying the table contains all the results before
truncation and after truncation.
-
- Session session = maybeGetSession();
createTestKeyspace();
+ Session session = maybeGetSession();
QualifiedTableName tableName =
createTestTableAndPopulate(sidecarTestContext, Arrays.asList("a", "b"));
// create a snapshot called <tableName>-snapshot for tbl1
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BasicMultiDCRf3Test.java
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BasicMultiDCRf3Test.java
index ff0cbda3..0304ea5b 100644
---
a/server/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BasicMultiDCRf3Test.java
+++
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BasicMultiDCRf3Test.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
+import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;
import io.vertx.junit5.VertxExtension;
@@ -38,6 +39,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* Note: Some related test classes are broken down to have a single test case
to parallelize test execution and
* therefore limit the instance size required to run the tests from CircleCI
as the in-jvm-dtests tests are memory bound
*/
+@Tag("heavy")
@ExtendWith(VertxExtension.class)
class BasicMultiDCRf3Test extends BaseTokenRangeIntegrationTest
{
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
b/server/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
index 7377cf26..ba2e7ecd 100644
---
a/server/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
+++
b/server/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -73,7 +74,8 @@ public class CassandraSidecarTestContext implements
AutoCloseable
private final AbstractCassandraTestContext abstractCassandraTestContext;
private final Vertx vertx;
private final List<InstancesMetadataListener> instancesMetadataListeners;
- private int numInstancesToManage;
+ // array of nodeNums that are 1-based
+ private int[] instancesToManage = null;
public InstancesMetadata instancesMetadata;
private List<JmxClient> jmxClients;
private CQLSessionProvider sessionProvider;
@@ -86,11 +88,11 @@ public class CassandraSidecarTestContext implements
AutoCloseable
SimpleCassandraVersion version,
CassandraVersionProvider
versionProvider,
DnsResolver dnsResolver,
- int numInstancesToManage,
+ int[] instancesToManage,
SslConfiguration sslConfiguration)
{
this.vertx = vertx;
- this.numInstancesToManage = numInstancesToManage;
+ this.instancesToManage = instancesToManage;
this.instancesMetadataListeners = new ArrayList<>();
this.abstractCassandraTestContext = abstractCassandraTestContext;
this.version = version;
@@ -102,7 +104,7 @@ public class CassandraSidecarTestContext implements
AutoCloseable
public static CassandraSidecarTestContext from(Vertx vertx,
AbstractCassandraTestContext cassandraTestContext,
DnsResolver dnsResolver,
- int numInstancesToManage,
+ int[] instancesToManage,
SslConfiguration
sslConfiguration)
{
org.apache.cassandra.testing.SimpleCassandraVersion rootVersion =
cassandraTestContext.version;
@@ -115,7 +117,7 @@ public class CassandraSidecarTestContext implements
AutoCloseable
versionParsed,
versionProvider,
dnsResolver,
- numInstancesToManage,
+ instancesToManage,
sslConfiguration);
}
@@ -165,9 +167,9 @@ public class CassandraSidecarTestContext implements
AutoCloseable
return cluster;
}
- public void setNumInstancesToManage(int numInstancesToManage)
+ public void setInstancesToManage(int... instancesToManage)
{
- this.numInstancesToManage = numInstancesToManage;
+ this.instancesToManage = instancesToManage;
refreshInstancesMetadata();
}
@@ -178,16 +180,16 @@ public class CassandraSidecarTestContext implements
AutoCloseable
refreshInstancesMetadata();
}
- public InstancesMetadata instancesMetadata()
+ public synchronized InstancesMetadata instancesMetadata()
{
if (instancesMetadata == null)
{
- refreshInstancesMetadata();
+ return refreshInstancesMetadata();
}
return this.instancesMetadata;
}
- public InstancesMetadata refreshInstancesMetadata()
+ public synchronized InstancesMetadata refreshInstancesMetadata()
{
// clean-up any open sessions or client resources
close();
@@ -226,6 +228,8 @@ public class CassandraSidecarTestContext implements
AutoCloseable
{
instancesMetadata.instances().forEach(instance ->
instance.delegate().close());
}
+
+ closeSessionProvider();
}
private void setInstancesMetadata()
@@ -237,8 +241,18 @@ public class CassandraSidecarTestContext implements
AutoCloseable
}
}
- private InstancesMetadata buildInstancesMetadata(CassandraVersionProvider
versionProvider,
- DnsResolver dnsResolver)
+ public CQLSessionProviderImpl buildNewCqlSessionProvider()
+ {
+ UpgradeableCluster cluster = cluster();
+ List<IInstanceConfig> configs = buildInstanceConfigs(cluster);
+ List<InetSocketAddress> addresses = buildContactList(configs);
+ return new CQLSessionProviderImpl(addresses, addresses, 500, null,
+ 0, username, password,
+ sslConfiguration,
SharedExecutorNettyOptions.INSTANCE);
+ }
+
+ private synchronized InstancesMetadata
buildInstancesMetadata(CassandraVersionProvider versionProvider,
+ DnsResolver
dnsResolver)
{
UpgradeableCluster cluster = cluster();
List<InstanceMetadata> metadata = new ArrayList<>();
@@ -317,25 +331,42 @@ public class CassandraSidecarTestContext implements
AutoCloseable
@NotNull
private List<IInstanceConfig> buildInstanceConfigs(UpgradeableCluster
cluster)
{
- int nodes = numInstancesToManage == -1 ? cluster.size() :
numInstancesToManage;
- return IntStream.range(1, nodes + 1)
+ Set<Integer> testManagedInstances;
+ int maxNodeNum;
+ if (instancesToManage == null)
+ {
+ testManagedInstances = null;
+ maxNodeNum = cluster.size();
+ }
+ else
+ {
+ testManagedInstances =
Arrays.stream(instancesToManage).boxed().collect(Collectors.toSet());
+ // throws if test sets an empty array, it is a test configuration
error
+ maxNodeNum = Arrays.stream(instancesToManage).max().getAsInt();
+ }
+ return IntStream.range(1, maxNodeNum + 1)
.mapToObj(nodeNum -> {
// check whether the instances are managed by the
test framework first. Because the nodeNum might be greater than the cluster size
if (manageInstanceByTestFramework() &&
cluster.get(nodeNum).isShutdown())
{
return null;
}
- else
+
+ // Test supplies instances to manage. However, the
set does not contain this nodeNum
+ if (testManagedInstances != null &&
!testManagedInstances.contains(nodeNum))
{
- return
AbstractClusterUtils.createInstanceConfig(cluster, nodeNum);
+ return null;
}
+
+ // The node should be managed by sidecar
+ return
AbstractClusterUtils.createInstanceConfig(cluster, nodeNum);
})
.collect(Collectors.toList());
}
private boolean manageInstanceByTestFramework()
{
- return numInstancesToManage == -1;
+ return instancesToManage == null;
}
/**
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
b/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
index 614cb445..55a5656d 100644
---
a/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
+++
b/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
@@ -45,6 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.google.inject.Guice;
import com.google.inject.Injector;
@@ -113,6 +114,7 @@ public abstract class IntegrationTestBase
protected Injector injector;
private final List<Throwable> testExceptions = new ArrayList<>();
private Module testSpecificModule;
+ private CountDownLatch schemaInitialized = new CountDownLatch(1);;
@BeforeEach
void setup(AbstractCassandraTestContext cassandraTestContext, TestInfo
testInfo) throws Exception
@@ -143,7 +145,9 @@ public abstract class IntegrationTestBase
Module mergedModule = modules.stream().reduce((m1, m2) ->
Modules.override(m1).with(m2)).get();
injector = Guice.createInjector(mergedModule);
vertx = injector.getInstance(Vertx.class);
-
+ // register the handler for ON_SIDECAR_SCHEMA_INITIALIZED the earliest
+
vertx.eventBus().localConsumer(SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED.address(),
+ msg -> schemaInitialized.countDown());
SslConfiguration sslConfig =
cassandraTestContext.annotation.authMode().equals(AuthMode.MUTUAL_TLS)
? sslConfigWithClientKeystoreTruststore()
: null;
@@ -155,13 +159,14 @@ public abstract class IntegrationTestBase
sslConfig = sslConfigWithTruststore();
}
sidecarTestContext = CassandraSidecarTestContext.from(vertx,
cassandraTestContext, DnsResolver.DEFAULT,
-
getNumInstancesToManage(clusterSize), sslConfig);
+
getInstancesToManage(clusterSize), sslConfig);
integrationTestModule.setCassandraTestContext(sidecarTestContext);
server = injector.getInstance(Server.class);
VertxTestContext context = new VertxTestContext();
- if (sidecarTestContext.isClusterBuilt())
+ boolean isClusterBuilt = sidecarTestContext.isClusterBuilt();
+ if (isClusterBuilt)
{
MessageConsumer<JsonObject> cqlReadyConsumer = vertx.eventBus()
.localConsumer(ON_CASSANDRA_CQL_READY.address());
@@ -172,10 +177,11 @@ public abstract class IntegrationTestBase
}
client = mTLSClient();
+ beforeServerStart();
server.start()
.onSuccess(s -> {
sidecarTestContext.registerInstanceConfigListener(this::healthCheck);
- if (!sidecarTestContext.isClusterBuilt())
+ if (!isClusterBuilt)
{
// Give everything a moment to get started and connected
vertx.setTimer(TimeUnit.SECONDS.toMillis(1), id1 ->
context.completeNow());
@@ -184,6 +190,12 @@ public abstract class IntegrationTestBase
.onFailure(context::failNow);
context.awaitCompletion(5, TimeUnit.SECONDS);
+
+ // add a listener to refresh instance metadata when cluster is not yet
built when starting server
+ if (!isClusterBuilt)
+ {
+ cassandraTestContext.setClusterBuiltListener(cluster ->
sidecarTestContext.refreshInstancesMetadata());
+ }
}
@AfterEach
@@ -203,6 +215,10 @@ public abstract class IntegrationTestBase
{
}
+ protected void beforeServerStart()
+ {
+ }
+
protected void installTestSpecificModule(Module testSpecificModule)
{
this.testSpecificModule = testSpecificModule;
@@ -210,11 +226,7 @@ public abstract class IntegrationTestBase
protected void waitForSchemaReady(long timeout, TimeUnit timeUnit)
{
- CountDownLatch latch = new CountDownLatch(1);
- vertx.eventBus()
-
.localConsumer(SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED.address(), msg
-> latch.countDown());
- awaitLatchOrTimeout(latch, timeout, timeUnit);
- assertThat(latch.getCount()).describedAs("Sidecar schema not
initialized").isZero();
+ awaitLatchOrTimeout(schemaInitialized, timeout, timeUnit, "Wait for
schema initialization");
}
/**
@@ -224,11 +236,11 @@ public abstract class IntegrationTestBase
* Defaults to the entire cluster.
*
* @param clusterSize the size of the cluster as defined by the
integration test
- * @return the number of instances to manage; or -1 to let test framework
to determine the cluster size at the runtime
+ * @return the instances to manage; or null to let test framework to
determine the cluster size at the runtime
*/
- protected int getNumInstancesToManage(int clusterSize)
+ protected int[] getInstancesToManage(int clusterSize)
{
- return -1;
+ return null;
}
protected void testWithClient(Consumer<WebClient> tester)
@@ -274,36 +286,17 @@ public abstract class IntegrationTestBase
}
}
- protected void testWithClientBlocking(boolean waitForCluster,
- Consumer<WebClient> tester)
- {
- CassandraAdapterDelegate delegate =
sidecarTestContext.instancesMetadata()
-
.instanceFromId(1)
- .delegate();
-
- assertThat(delegate).isNotNull();
- if (delegate.isNativeUp() || !waitForCluster)
- {
- tester.accept(client);
- }
- else
- {
- vertx.eventBus().localConsumer(ON_CASSANDRA_CQL_READY.address(),
(Message<JsonObject> message) -> {
- if (message.body().getInteger("cassandraInstanceId") == 1)
- {
- tester.accept(client);
- }
- });
- }
-
- }
-
protected void createTestKeyspace()
{
createTestKeyspace(ImmutableMap.of(DATA_CENTER_PREFIX + 1, 1));
}
protected void createTestKeyspace(Map<String, Integer> rf)
+ {
+ createKeyspace(TEST_KEYSPACE, rf);
+ }
+
+ protected void createKeyspace(String keyspaceName, Map<String, Integer> rf)
{
int attempts = 1;
ArrayList<Throwable> thrown = new ArrayList<>(5);
@@ -311,12 +304,13 @@ public abstract class IntegrationTestBase
{
try
{
- sidecarTestContext.refreshInstancesMetadata();
-
Session session = maybeGetSession();
- session.execute("CREATE KEYSPACE " + IF_NOT_EXISTS + " " +
TEST_KEYSPACE
- + " WITH REPLICATION = { 'class' :
'NetworkTopologyStrategy', " + generateRfString(rf) + " };");
+ ResultSet rs = session.execute("CREATE KEYSPACE " +
IF_NOT_EXISTS + " " + keyspaceName
+ + " WITH REPLICATION = {
'class' : 'NetworkTopologyStrategy', " + generateRfString(rf) + " };");
+ assertThat(rs.getExecutionInfo().isSchemaInAgreement())
+ .describedAs("Schema agreement is not reached")
+ .isTrue();
return;
}
catch (Throwable t)
@@ -396,7 +390,7 @@ public abstract class IntegrationTestBase
}
// similar to awaitLatchOrTimeout, it throws either test exceptions (due
to startAsync failures) or timeout exception
- protected void awaitLatchOrThrow(CountDownLatch latch, long duration,
TimeUnit timeUnit, String latchName)
+ public void awaitLatchOrThrow(CountDownLatch latch, long duration,
TimeUnit timeUnit, String latchName)
{
String hint = latchName == null ? "" : '(' + latchName + ')';
boolean completed = Uninterruptibles.awaitUninterruptibly(latch,
duration, timeUnit);
@@ -409,7 +403,7 @@ public abstract class IntegrationTestBase
throw new AssertionError("Latch " + hint + " times out after " +
duration + ' ' + timeUnit.name());
}
- protected static void awaitLatchOrTimeout(CountDownLatch latch, long
duration, TimeUnit timeUnit, String latchName)
+ public static void awaitLatchOrTimeout(CountDownLatch latch, long
duration, TimeUnit timeUnit, String latchName)
{
String hint = latchName == null ? "" : '(' + latchName + ')';
assertThat(Uninterruptibles.awaitUninterruptibly(latch, duration,
timeUnit))
@@ -417,7 +411,7 @@ public abstract class IntegrationTestBase
.isTrue();
}
- protected static void awaitLatchOrTimeout(CountDownLatch latch, long
duration, TimeUnit timeUnit)
+ public static void awaitLatchOrTimeout(CountDownLatch latch, long
duration, TimeUnit timeUnit)
{
awaitLatchOrTimeout(latch, duration, timeUnit, null);
}
@@ -468,7 +462,6 @@ public abstract class IntegrationTestBase
context.completeNow();
}
-
private static QualifiedTableName uniqueTestTableFullName(String
tablePrefix)
{
String uniqueTableName = tablePrefix + TEST_TABLE_ID.getAndIncrement();
@@ -520,7 +513,7 @@ public abstract class IntegrationTestBase
CertificateBuilder builder = new CertificateBuilder()
.subject("CN=Apache Cassandra, OU=ssl_test,
O=Unknown, L=Unknown, ST=Unknown, C=Unknown")
.addSanDnsName("localhost")
- .addSanIpAddress("127.0.0.1")
+ .addSanIpAddress(subjectAlternativeNameIpAddress())
.addSanUriName(identity);
if (expired)
{
@@ -548,6 +541,11 @@ public abstract class IntegrationTestBase
.build();
}
+ protected String subjectAlternativeNameIpAddress()
+ {
+ return "127.0.0.1";
+ }
+
protected WebClient createClient(Path clientKeystorePath, Path
truststorePath)
{
WebClientOptions options = new WebClientOptions();
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java
b/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java
index df165815..ba5cf3b9 100644
---
a/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java
+++
b/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java
@@ -32,6 +32,7 @@ import io.vertx.core.Vertx;
import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
import
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
import org.apache.cassandra.sidecar.config.AccessControlConfiguration;
@@ -47,12 +48,18 @@ import
org.apache.cassandra.sidecar.config.yaml.CoordinationConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.KeyStoreConfigurationImpl;
import
org.apache.cassandra.sidecar.config.yaml.ParameterizedClassConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.PeriodicTaskConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.SSTableUploadConfigurationImpl;
import
org.apache.cassandra.sidecar.config.yaml.SchemaKeyspaceConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.SslConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.TestServiceConfiguration;
import org.apache.cassandra.sidecar.coordination.ClusterLease;
+import org.apache.cassandra.sidecar.coordination.ClusterLeaseClaimTask;
+import org.apache.cassandra.sidecar.coordination.ElectorateMembership;
+import org.apache.cassandra.sidecar.db.SidecarLeaseDatabaseAccessor;
import
org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException;
+import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
+import org.apache.cassandra.sidecar.tasks.ScheduleDecision;
import org.jetbrains.annotations.NotNull;
import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP;
@@ -99,6 +106,7 @@ public class IntegrationTestModule extends AbstractModule
.isEnabled(true)
.build())
.coordinationConfiguration(clusterLeaseClaimTaskConfiguration)
+ .sstableUploadConfiguration(new
SSTableUploadConfigurationImpl(0F))
.build();
PeriodicTaskConfiguration healthCheckConfiguration
= new PeriodicTaskConfigurationImpl(true,
@@ -125,11 +133,54 @@ public class IntegrationTestModule extends AbstractModule
.build();
}
+ @Provides
+ @Singleton
+ public ClusterLeaseClaimTask clusterLeaseClaimTask(Vertx vertx,
+ ServiceConfiguration
serviceConfiguration,
+ ElectorateMembership
electorateMembership,
+
SidecarLeaseDatabaseAccessor accessor,
+ ClusterLease
clusterLease,
+ SidecarMetrics metrics)
+ {
+ return new ClusterLeaseClaimTask(vertx,
+ serviceConfiguration,
+ electorateMembership,
+ accessor,
+ clusterLease,
+ metrics)
+ {
+ @Override
+ public DurationSpec delay()
+ {
+ return
serviceConfiguration.coordinationConfiguration().clusterLeaseClaimConfiguration().executeInterval();
+ }
+
+ @Override
+ public DurationSpec initialDelay()
+ {
+ return
serviceConfiguration.coordinationConfiguration().clusterLeaseClaimConfiguration().initialDelay();
+ }
+
+ @Override
+ public ScheduleDecision scheduleDecision()
+ {
+ // stop further executions if cluster lease is already
claimed; otherwise, run it, regardless of ElectorateMembership
+ if (!accessor.isAvailable() ||
clusterLease.isClaimedByLocalSidecar())
+ {
+ return ScheduleDecision.SKIP;
+ }
+ return ScheduleDecision.EXECUTE;
+ }
+ };
+ }
+
@Provides
@Singleton
public CoordinationConfiguration clusterLeaseClaimTaskConfiguration()
{
- return new CoordinationConfigurationImpl(new
PeriodicTaskConfigurationImpl());
+ return new CoordinationConfigurationImpl(new
PeriodicTaskConfigurationImpl(true,
+
MillisecondBoundConfiguration.parse("1s"),
+
MillisecondBoundConfiguration.parse("1s")));
}
@Provides
@@ -161,13 +212,6 @@ public class IntegrationTestModule extends AbstractModule
return cqlSessionProvider;
}
- @Provides
- @Singleton
- public ClusterLease clusterLease()
- {
- return new ClusterLease(ClusterLease.Ownership.CLAIMED);
- }
-
private AccessControlConfiguration accessControlConfiguration()
{
Map<String, String> params = new HashMap<String, String>()
diff --git
a/server/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java
b/server/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java
index 111205a8..2ff58a1b 100644
---
a/server/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java
+++
b/server/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java
@@ -22,6 +22,7 @@ import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +40,8 @@ public abstract class AbstractCassandraTestContext implements
AutoCloseable
public final SimpleCassandraVersion version;
private final Map<String, String> initialProperties;
- protected UpgradeableCluster cluster;
+ private UpgradeableCluster cluster;
+ private Consumer<UpgradeableCluster> onClusterBuilt;
// certificates created when cluster is started with auth
public final CertificateBundle ca;
@@ -78,6 +80,24 @@ public abstract class AbstractCassandraTestContext
implements AutoCloseable
return cluster;
}
+ public void setClusterBuiltListener(Consumer<UpgradeableCluster> listener)
+ {
+ this.onClusterBuilt = listener;
+ if (cluster != null)
+ {
+ onClusterBuilt.accept(cluster);
+ }
+ }
+
+ protected void setCluster(UpgradeableCluster cluster)
+ {
+ this.cluster = cluster;
+ if (onClusterBuilt != null)
+ {
+ onClusterBuilt.accept(cluster);
+ }
+ }
+
@Override
public void close()
{
diff --git
a/server/src/test/integration/org/apache/cassandra/testing/CassandraTestContext.java
b/server/src/test/integration/org/apache/cassandra/testing/CassandraTestContext.java
index 905daa45..5c7f47be 100644
---
a/server/src/test/integration/org/apache/cassandra/testing/CassandraTestContext.java
+++
b/server/src/test/integration/org/apache/cassandra/testing/CassandraTestContext.java
@@ -30,7 +30,6 @@ import
org.apache.cassandra.testing.utils.tls.CertificateBundle;
*/
public class CassandraTestContext extends AbstractCassandraTestContext
{
-
public CassandraTestContext(SimpleCassandraVersion version,
UpgradeableCluster cluster,
CertificateBundle ca,
@@ -46,7 +45,7 @@ public class CassandraTestContext extends
AbstractCassandraTestContext
{
return "CassandraTestContext{"
+ "version=" + version
- + ", cluster=" + cluster
+ + ", cluster=" + cluster()
+ '}';
}
}
diff --git
a/server/src/test/integration/org/apache/cassandra/testing/CassandraTestTemplate.java
b/server/src/test/integration/org/apache/cassandra/testing/CassandraTestTemplate.java
index b494b8f2..ff50a761 100644
---
a/server/src/test/integration/org/apache/cassandra/testing/CassandraTestTemplate.java
+++
b/server/src/test/integration/org/apache/cassandra/testing/CassandraTestTemplate.java
@@ -183,7 +183,7 @@ public class CassandraTestTemplate implements
TestTemplateInvocationContextProvi
Path tempDirPath = Files.createTempDirectory("certs");
CertificateBundle ca = ca();
- Path serverKeystorePath = serverKeystorePath(ca, tempDirPath);
+ Path serverKeystorePath = serverKeystorePath(ca, tempDirPath,
finalNodeCount);
Path truststorePath = truststorePath(ca, tempDirPath);
switch (annotation.authMode())
@@ -382,13 +382,16 @@ public class CassandraTestTemplate implements
TestTemplateInvocationContextProvi
return ca.toTempKeyStorePath(path, truststorePassword.toCharArray(),
truststorePassword.toCharArray());
}
- private Path serverKeystorePath(CertificateBundle ca, Path path) throws
Exception
+ private Path serverKeystorePath(CertificateBundle ca, Path path, int
totalNodes) throws Exception
{
- CertificateBundle keystore = new CertificateBuilder()
- .subject("CN=Apache Cassandra,
OU=ssl_test, O=Unknown, L=Unknown, ST=Unknown, C=Unknown")
- .addSanDnsName("localhost")
- .addSanIpAddress("127.0.0.1")
- .buildIssuedBy(ca);
+ CertificateBuilder builder = new CertificateBuilder();
+ builder.subject("CN=Apache Cassandra, OU=ssl_test, O=Unknown,
L=Unknown, ST=Unknown, C=Unknown")
+ .addSanDnsName("localhost");
+ for (int i = 1; i <= totalNodes; i++)
+ {
+ builder.addSanIpAddress("127.0.0." + i);
+ }
+ CertificateBundle keystore = builder.buildIssuedBy(ca);
return keystore.toTempKeyStorePath(path,
serverKeystorePassword.toCharArray(), serverKeystorePassword.toCharArray());
}
@@ -427,8 +430,7 @@ public class CassandraTestTemplate implements
TestTemplateInvocationContextProvi
static
{
- // Settings to reduce the test setup delay incurred if gossip is
enabled
- System.setProperty("cassandra.ring_delay_ms", "5000"); // down from
30s default
+ System.setProperty("cassandra.ring_delay_ms", "5000"); // down from
30s default; this change has no effect if GOSSIP feature is enabled
System.setProperty("cassandra.consistent.rangemovement", "false");
System.setProperty("cassandra.consistent.simultaneousmoves.allow",
"true");
// End gossip delay settings
diff --git
a/server/src/test/integration/org/apache/cassandra/testing/ConfigurableCassandraTestContext.java
b/server/src/test/integration/org/apache/cassandra/testing/ConfigurableCassandraTestContext.java
index d52d96de..7500a7e9 100644
---
a/server/src/test/integration/org/apache/cassandra/testing/ConfigurableCassandraTestContext.java
+++
b/server/src/test/integration/org/apache/cassandra/testing/ConfigurableCassandraTestContext.java
@@ -46,7 +46,8 @@ public class ConfigurableCassandraTestContext extends
AbstractCassandraTestConte
public UpgradeableCluster
configureAndStartCluster(Consumer<UpgradeableCluster.Builder> configurator)
{
configurator.accept(builder);
- cluster = CassandraTestTemplate.retriableStartCluster(builder, 3);
+ UpgradeableCluster cluster =
CassandraTestTemplate.retriableStartCluster(builder, 3);
+ setCluster(cluster);
return cluster;
}
@@ -55,6 +56,7 @@ public class ConfigurableCassandraTestContext extends
AbstractCassandraTestConte
{
return "ConfigurableCassandraTestContext{"
+ ", version=" + version
+ + ", cluster=" + cluster()
+ ", builder=" + builder
+ '}';
}
diff --git a/server/src/test/resources/logback-in-jvm-dtest.xml
b/server/src/test/resources/logback-in-jvm-dtest.xml
index b6e574a2..9770bd2a 100644
--- a/server/src/test/resources/logback-in-jvm-dtest.xml
+++ b/server/src/test/resources/logback-in-jvm-dtest.xml
@@ -20,18 +20,23 @@
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
- <level>INFO</level>
+ <level>DEBUG</level>
</filter>
<encoder>
<pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
</encoder>
</appender>
- <root level="INFO">
+ <root level="DEBUG">
<appender-ref ref="STDOUT" />
</root>
-
+ <logger name="org.apache.cassandra.sidecar" level="DEBUG" />
+ <logger name="org.apache.cassandra" level="INFO" />
+ <logger name="io.netty" level="WARN" />
+ <logger name="shaded.io.netty" level="WARN" />
+ <logger name="shaded.com.datastax.shaded.netty" level="WARN" />
+ <logger name="o.a.c.sidecar.client.shaded.io.netty" level="WARN" />
<logger name="com.datastax.driver.core" level="ERROR" />
<logger name="com.datastax.driver.core.ControlConnection" level="OFF" />
</configuration>
diff --git a/vertx-auth-mtls/build.gradle b/vertx-auth-mtls/build.gradle
index ba2b248f..c4449efe 100644
--- a/vertx-auth-mtls/build.gradle
+++ b/vertx-auth-mtls/build.gradle
@@ -39,6 +39,7 @@ test {
println("Destination directory for vertx-auth-mtls tests: ${destDir}")
junitXml.getOutputLocation().set(destDir)
html.setRequired(true)
+ html.getOutputLocation().set(destDir)
}
}
diff --git a/vertx-client-shaded/build.gradle b/vertx-client-shaded/build.gradle
index 464df4fa..23bce206 100644
--- a/vertx-client-shaded/build.gradle
+++ b/vertx-client-shaded/build.gradle
@@ -61,6 +61,7 @@ tasks.named('test') {
println("Destination directory for vertx-client-shaded tests:
${destDir}")
junitXml.getOutputLocation().set(destDir)
html.setRequired(true)
+ html.getOutputLocation().set(destDir)
}
}
diff --git a/vertx-client/build.gradle b/vertx-client/build.gradle
index 2afd2260..b825cf08 100644
--- a/vertx-client/build.gradle
+++ b/vertx-client/build.gradle
@@ -44,6 +44,7 @@ test {
println("Destination directory for vertx-client tests: ${destDir}")
junitXml.getOutputLocation().set(destDir)
html.setRequired(true)
+ html.getOutputLocation().set(destDir)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]