bbotella commented on code in PR #166: URL: https://github.com/apache/cassandra-sidecar/pull/166#discussion_r1897997868
########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/ProgressInfo.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base.data; + +import javax.management.openmbean.CompositeData; + +/** + * Representation of the stream progress info + */ +public class ProgressInfo +{ + + public final String peer; + public final int sessionIndex; + public final String fileName; + public final String direction; + public final long currentBytes; + public final long totalBytes; + + public ProgressInfo(CompositeData data) + { + Review Comment: Remove extra line ########## server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java: ########## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.jupiter.api.extension.ExtendWith; + +import com.datastax.driver.core.Session; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.dynamic.ClassFileLocator; +import net.bytebuddy.dynamic.TypeResolutionStrategy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import net.bytebuddy.pool.TypePool; +import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.IUpgradeableInstance; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.shared.Uninterruptibles; +import org.apache.cassandra.sidecar.common.response.StreamStatsResponse; +import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats; +import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName; +import org.apache.cassandra.sidecar.testing.CassandraSidecarTestContext; +import org.apache.cassandra.sidecar.testing.IntegrationTestBase; +import org.apache.cassandra.testing.CassandraIntegrationTest; +import org.apache.cassandra.testing.ConfigurableCassandraTestContext; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests the stream stats endpoint with cassandra container. + */ +@ExtendWith(VertxExtension.class) +public class StreamStatsIntegrationTest extends IntegrationTestBase +{ + @CassandraIntegrationTest(numDataDirsPerInstance = 4, nodesPerDc = 5, network = true, buildCluster = false) + void streamStatsTest(VertxTestContext context, ConfigurableCassandraTestContext cassandraTestContext) throws InterruptedException + { + + BBHelperDecommissioningNode.reset(); + UpgradeableCluster cluster = cassandraTestContext.configureAndStartCluster( + builder -> builder.withInstanceInitializer(BBHelperDecommissioningNode::install)); + IUpgradeableInstance node = cluster.get(5); + IUpgradeableInstance seed = cluster.get(1); + + createTestKeyspace(); + createTestTableAndPopulate(); + + startAsync("Decommission node" + node.config().num(), + () -> node.nodetoolResult("decommission").asserts().success()); + AtomicBoolean hasStats = new AtomicBoolean(false); + AtomicBoolean dataReceived = new AtomicBoolean(false); + + // Wait until nodes have reached expected state + awaitLatchOrThrow(BBHelperDecommissioningNode.transientStateStart, 2, TimeUnit.MINUTES, "transientStateStart"); + + ClusterUtils.awaitRingState(seed, node, "Leaving"); + BBHelperDecommissioningNode.transientStateEnd.countDown(); + + for (int i = 0; i < 20; i++) + { + startAsync("Request-" + i , () -> { + try + { + streamStats(context, hasStats, dataReceived); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }); + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } + + assertThat(hasStats).isTrue(); + assertThat(dataReceived).isTrue(); + context.completeNow(); + context.awaitCompletion(2, TimeUnit.MINUTES); + } + + private void streamStats(VertxTestContext context, AtomicBoolean hasStats, AtomicBoolean dataReceived) throws Exception + { + String testRoute = "/api/v1/cassandra/stats/stream"; + testWithClient(context, client -> { + BBHelperDecommissioningNode.transientStateEnd.countDown(); + client.get(server.actualPort(), "127.0.0.1", testRoute) + .send(context.succeeding(response -> { + assertRingResponseOK(response, sidecarTestContext, hasStats, dataReceived); + })); + }); + } + + void assertRingResponseOK(HttpResponse<Buffer> response, CassandraSidecarTestContext cassandraTestContext, + AtomicBoolean hasStats, AtomicBoolean dataReceived) + { + StreamStatsResponse streamStatsResponse = response.bodyAsJson(StreamStatsResponse.class); + assertThat(streamStatsResponse).isNotNull(); + StreamProgressStats streamProgress = streamStatsResponse.streamProgressStats(); + assertThat(streamProgress).isNotNull(); + if (streamProgress.totalFilesToReceive() > 0) + { + hasStats.set(true); + if (streamProgress.totalFilesToReceive() == streamProgress.totalFilesReceived() && Review Comment: Do we need an else here? ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/StreamProgressStats.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.response.data; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * A class representing stats summarizing the progress of streamed bytes and files on the node + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class StreamProgressStats +{ + @JsonProperty("totalFilesToReceive") + public long totalFilesToReceive() + { + return totalFilesToReceive; + } + + @JsonProperty("totalFilesReceived") + public long totalFilesReceived() + { + return totalFilesReceived; + } + + @JsonProperty("totalBytesToReceive") + public long totalBytesToReceive() + { + return totalBytesToReceive; + } + + @JsonProperty("totalBytesReceived") + public long totalBytesReceived() + { + return totalBytesReceived; + } + + @JsonProperty("totalFilesToSend") + public long totalFilesToSend() + { + return totalFilesToSend; + } + + @JsonProperty("totalFilesSent") + public long totalFilesSent() + { + return totalFilesSent; + } + + @JsonProperty("totalBytesToSend") + public long totalBytesToSend() + { + return totalBytesToSend; + } + + @JsonProperty("totalBytesSent") + public long totalBytesSent() + { + return totalBytesSent; + } + + /* Across all sessions + * private final long sendProgress; from "sent to" Review Comment: ```suggestion private final long sendProgress; from "sent to" ``` ########## server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java: ########## @@ -372,6 +374,9 @@ public Router vertxRouter(Vertx vertx, router.get(ApiEndpointsV1.LIST_OPERATIONAL_JOBS_ROUTE) .handler(listOperationalJobsHandler); + router.get(ApiEndpointsV1.STREAM_STATS_ROUTE) Review Comment: If (hopefully when :-P ) [this PR](https://github.com/apache/cassandra-sidecar/pull/164) gets merged, I would love to see the new API definition updated there as well. Let's work together to make that happen. ########## client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java: ########## @@ -1573,6 +1575,36 @@ public void onError(Throwable throwable) assertThat(new String(baos.toByteArray(), StandardCharsets.UTF_8)).isEqualTo("Test Content"); } + @Test + public void testStreamsStats() throws Exception + { + String streamStatsResponseAsString = "{\"operationMode\":\"NORMAL\"," + + "\"streamProgressStats\":{\"totalFilesToReceive\":7," + + "\"totalFilesReceived\":7,\"totalBytesToReceive\":15088," + + "\"totalBytesReceived\":15088,\"totalFilesToSend\":0,\"totalFilesSent\":0," + + "\"totalBytesToSend\":0,\"totalBytesSent\":0}}"; + + MockResponse response = new MockResponse().setResponseCode(OK.code()).setBody(streamStatsResponseAsString); + enqueue(response); + StreamStatsResponse result = client.streamsStats().get(); + + assertThat(result).isNotNull(); + assertThat(result.operationMode()).isNotNull().isEqualTo("NORMAL"); + StreamProgressStats progressStats = result.streamProgressStats(); + assertThat(progressStats).isNotNull(); + assertThat(progressStats.totalBytesReceived()).isNotNull(); + assertThat(progressStats.totalBytesSent()).isNotNull(); + assertThat(progressStats.totalBytesToReceive()).isNotNull().isEqualTo(progressStats.totalBytesReceived()); + assertThat(progressStats.totalBytesToSend()).isNotNull(); + assertThat(progressStats.totalFilesToReceive()).isNotNull(); + assertThat(progressStats.totalFilesToSend()).isNotNull(); + assertThat(progressStats.totalFilesReceived()).isNotNull().isEqualTo(progressStats.totalFilesToReceive()); + assertThat(progressStats.totalFilesSent()).isNotNull(); + Review Comment: ```suggestion ``` ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStreamManagerOperations.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base; + +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import javax.management.openmbean.CompositeData; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.sidecar.adapters.base.data.SessionInfo; +import org.apache.cassandra.sidecar.adapters.base.data.StreamState; +import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats; +import org.apache.cassandra.sidecar.common.server.JmxClient; +import org.apache.cassandra.sidecar.common.server.StreamManagerOperations; +import org.apache.cassandra.sidecar.common.server.dns.DnsResolver; + +import static org.apache.cassandra.sidecar.adapters.base.StreamManagerJmxOperations.STREAM_MANAGER_OBJ_NAME; + +/** + * An implementation of the {@link StreamManagerOperations} that interfaces with Cassandra 4.0 and later + */ +public class CassandraStreamManagerOperations implements StreamManagerOperations +{ + + private static final Logger LOGGER = LoggerFactory.getLogger(CassandraStorageOperations.class); + protected final JmxClient jmxClient; + + /** + * Creates a new instance with the provided {@link JmxClient} and {@link DnsResolver} + * + * @param jmxClient the JMX client used to communicate with the Cassandra instance + * @param dnsResolver the DNS resolver used to lookup replicas + */ + public CassandraStreamManagerOperations(JmxClient jmxClient, DnsResolver dnsResolver) + { + this.jmxClient = jmxClient; + } + + public StreamProgressStats getStreamProgressStats() + { + Set<CompositeData> streamData = jmxClient.proxy(StreamManagerJmxOperations.class, STREAM_MANAGER_OBJ_NAME) + .getCurrentStreams(); + + List<StreamState> streamStates = streamData.stream().map(StreamState::new).collect(Collectors.toList()); + return computeStats(streamStates); + } + + private StreamProgressStats computeStats(List<StreamState> streamStates) + { + List<SessionInfo> sessions = streamStates.stream().map(s -> s.sessions()).flatMap(Collection::stream).collect(Collectors.toList()); Review Comment: I understand this is by no means as hot path as the ones the [dev thread](https://lists.apache.org/thread/65glsjzkmpktzmns6j9wvr4nczvskx36) was referring to, but I wonder if we should keep that choice in sidecar of avoiding streams in non test code. Same for the other streams on the PR ########## server/src/test/java/org/apache/cassandra/sidecar/routes/StreamStatsHandlerTest.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import com.google.inject.util.Modules; +import io.vertx.core.Vertx; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.predicate.ResponsePredicate; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.apache.cassandra.sidecar.TestModule; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.cluster.InstancesConfig; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.response.StreamStatsResponse; +import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.server.StreamManagerOperations; +import org.apache.cassandra.sidecar.server.MainModule; +import org.apache.cassandra.sidecar.server.Server; +import org.mockito.stubbing.Answer; + +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link StreamStatsHandler} + */ +@ExtendWith(VertxExtension.class) +public class StreamStatsHandlerTest +{ + + static final Logger LOGGER = LoggerFactory.getLogger(StreamStatsHandlerTest.class); + Vertx vertx; + Server server; + + @BeforeEach + void before() throws InterruptedException + { + Module testOverride = Modules.override(new TestModule()) + .with(new StreamingStatsTestModule()); + Injector injector = Guice.createInjector(Modules.override(new MainModule()) + .with(testOverride)); + vertx = injector.getInstance(Vertx.class); + server = injector.getInstance(Server.class); + VertxTestContext context = new VertxTestContext(); + server.start() + .onSuccess(s -> context.completeNow()) + .onFailure(context::failNow); + context.awaitCompletion(5, TimeUnit.SECONDS); + } + + @AfterEach + void after() throws InterruptedException + { + CountDownLatch closeLatch = new CountDownLatch(1); + server.close().onSuccess(res -> closeLatch.countDown()); + if (closeLatch.await(60, TimeUnit.SECONDS)) + LOGGER.info("Close event received before timeout."); + else + LOGGER.error("Close event timed out."); + } + + @Test + void testStreamingStatsHandler(VertxTestContext context) + { + StreamingStatsTestModule.streamingStatsSupplier = () -> { + StreamStatsResponse response = new StreamStatsResponse("NORMAL", + new StreamProgressStats(7, 7, 1024, 1024, 0, 0, 0, 0)); Review Comment: Same comment with quick theories testing library. ########## client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java: ########## @@ -1573,6 +1575,36 @@ public void onError(Throwable throwable) assertThat(new String(baos.toByteArray(), StandardCharsets.UTF_8)).isEqualTo("Test Content"); } + @Test + public void testStreamsStats() throws Exception + { + String streamStatsResponseAsString = "{\"operationMode\":\"NORMAL\"," + + "\"streamProgressStats\":{\"totalFilesToReceive\":7," + + "\"totalFilesReceived\":7,\"totalBytesToReceive\":15088," + Review Comment: These numbers make me wonder if it would be worth to consider adding [Quick Theories](https://github.com/quicktheories/QuickTheories) testing library to sidecar. I could see the case of leaving that out of this PR, but it would definitely increase coverage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

