yifan-c commented on code in PR #188:
URL: https://github.com/apache/cassandra-sidecar/pull/188#discussion_r1945579617
##########
server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java:
##########
@@ -42,109 +40,133 @@
import net.bytebuddy.pool.TypePool;
import org.apache.cassandra.distributed.UpgradeableCluster;
import org.apache.cassandra.distributed.api.IUpgradeableInstance;
-import org.apache.cassandra.distributed.shared.ClusterUtils;
import org.apache.cassandra.sidecar.common.response.StreamStatsResponse;
import org.apache.cassandra.sidecar.common.response.data.StreamsProgressStats;
import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
+import org.apache.cassandra.sidecar.common.server.utils.ThrowableUtils;
import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.testing.CassandraIntegrationTest;
-import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+import org.apache.cassandra.testing.CassandraTestContext;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking;
import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
/**
* Tests the stream stats endpoint with cassandra container.
*/
-@ExtendWith(VertxExtension.class)
public class StreamStatsIntegrationTest extends IntegrationTestBase
{
- @CassandraIntegrationTest(numDataDirsPerInstance = 4, nodesPerDc = 2,
network = true, buildCluster = false)
- void streamStatsTest(VertxTestContext context,
ConfigurableCassandraTestContext cassandraTestContext) throws Exception
+ @CassandraIntegrationTest(numDataDirsPerInstance = 4, nodesPerDc = 2,
network = true)
+ void streamStatsTest(CassandraTestContext cassandraTestContext)
{
- BBHelperDecommissioningNode.reset();
- UpgradeableCluster cluster =
cassandraTestContext.configureAndStartCluster(
- builder ->
builder.withInstanceInitializer(BBHelperDecommissioningNode::install));
- IUpgradeableInstance node = cluster.get(2);
-
- createTestKeyspace();
- createTestTableAndPopulate();
-
- startAsync("Decommission node" + node.config().num(),
- () -> node.nodetoolResult("decommission",
"--force").asserts().success());
- AtomicBoolean hasStats = new AtomicBoolean(false);
- AtomicBoolean dataReceived = new AtomicBoolean(false);
-
- // Wait until nodes have reached expected state
- awaitLatchOrThrow(BBHelperDecommissioningNode.transientStateStart, 2,
TimeUnit.MINUTES, "transientStateStart");
-
- // optimal no. of attempts to poll for stats to capture streaming
stats during node decommissioning
- loopAssert(15, 200, () -> {
- StreamsProgressStats progressStats = streamStats(hasStats,
dataReceived);
- assertThat(hasStats).isTrue();
- assertThat(dataReceived)
- .describedAs("Stream Progress Stats - totalFilesReceived:" +
progressStats.totalFilesReceived() +
- " totalBytesReceived:" +
progressStats.totalBytesReceived())
- .isTrue();
- });
- ClusterUtils.awaitGossipStatus(node, node, "LEFT");
- BBHelperDecommissioningNode.transientStateEnd.countDown();
+// BBHelperDecommissioningNode.reset();
+ UpgradeableCluster cluster = cassandraTestContext.cluster();
+// cassandraTestContext.configureAndStartCluster(
+// builder ->
builder.withInstanceInitializer(BBHelperDecommissioningNode::install));
- context.completeNow();
- context.awaitCompletion(2, TimeUnit.MINUTES);
+ createTestKeyspace(Map.of("datacenter1", 2));
+ QualifiedTableName tableName = createTestTable(
+ "CREATE TABLE %s ( \n" +
+ " race_year int, \n" +
+ " race_name text, \n" +
+ " cyclist_name text, \n" +
+ " rank int, \n" +
+ " PRIMARY KEY ((race_year, race_name), rank) \n" +
+ ");");
+ // craft inconsistency for repair
+ populateDataAtNode2Only(cluster, tableName);
+
+ // Poll stream stats while repair is running in the background.
+ CountDownLatch testStart = new CountDownLatch(1);
+ IUpgradeableInstance node = cluster.get(1);
+ AtomicReference<Throwable> nodetoolError = new AtomicReference<>();
+ startAsync("Repairing node" + node.config().num(),
Review Comment:
Changed the stream trigger from decommission to repair. The test also does
not need bytebuddy to intercept, hence removing it.
We should avoid bytebuddy at the best, to avoid dependency on Cassandra's
implementation details.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]