frankgh commented on code in PR #166: URL: https://github.com/apache/cassandra-sidecar/pull/166#discussion_r1894420568
########## server/src/test/java/org/apache/cassandra/sidecar/routes/StreamStatsHandlerTest.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import com.google.inject.util.Modules; +import io.vertx.core.Vertx; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.predicate.ResponsePredicate; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.apache.cassandra.sidecar.TestModule; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.cluster.InstancesConfig; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.response.StreamStatsResponse; +import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.server.StreamManagerOperations; +import org.apache.cassandra.sidecar.server.MainModule; +import org.apache.cassandra.sidecar.server.Server; +import org.mockito.stubbing.Answer; + +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link StreamStatsHandler} + */ +@ExtendWith(VertxExtension.class) +public class StreamStatsHandlerTest +{ + + static final Logger LOGGER = LoggerFactory.getLogger(StreamStatsHandlerTest.class); + Vertx vertx; + Server server; + + @BeforeEach + void before() throws InterruptedException + { + Module testOverride = Modules.override(new TestModule()) + .with(new StreamingStatsTestModule()); + Injector injector = Guice.createInjector(Modules.override(new MainModule()) + .with(testOverride)); + vertx = injector.getInstance(Vertx.class); + server = injector.getInstance(Server.class); + VertxTestContext context = new VertxTestContext(); + server.start() + .onSuccess(s -> context.completeNow()) + .onFailure(context::failNow); + context.awaitCompletion(5, TimeUnit.SECONDS); + } + + @AfterEach + void after() throws InterruptedException + { + CountDownLatch closeLatch = new CountDownLatch(1); + server.close().onSuccess(res -> closeLatch.countDown()); + if (closeLatch.await(60, TimeUnit.SECONDS)) + LOGGER.info("Close event received before timeout."); + else + LOGGER.error("Close event timed out."); + } + + @Test + void testStreamingStatsHandler(VertxTestContext context) + { + StreamingStatsTestModule.streamingStatsSupplier = () -> { + StreamStatsResponse response = new StreamStatsResponse("NORMAL", + new StreamProgressStats(7, 7, 1024, 1024, 0, 0, 0, 0)); + return response; Review Comment: NIT ```suggestion return new StreamStatsResponse("NORMAL", new StreamProgressStats(7, 7, 1024, 1024, 0, 0, 0, 0)); ``` ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/StreamProgressStats.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.response.data; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * A class representing stats summarizing the progress of streamed bytes and files on the node + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class StreamProgressStats +{ + @JsonProperty("totalFilesToReceive") + public long totalFilesToReceive() Review Comment: class layout is wrong, can you please follow the code style guidelines. ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base.data; + +import java.util.Arrays; +import java.util.Collection; +import java.util.stream.Collectors; +import javax.management.openmbean.CompositeData; + +import com.google.common.collect.Iterables; + +/** + * Representation of session info data + */ +public class SessionInfo +{ + public final String peer; + public final int sessionIndex; + public final String connecting; + /** Immutable collection of receiving summaries */ + public final Collection<StreamSummary> receivingSummaries; + /** Immutable collection of sending summaries*/ + public final Collection<StreamSummary> sendingSummaries; + /** Current session state */ + public final String state; + + public final Collection<ProgressInfo> receivingFiles; + public final Collection<ProgressInfo> sendingFiles; + + public SessionInfo(CompositeData data) + { + this.peer = (String) data.get("peer"); + this.sessionIndex = (int) data.get("sessionIndex"); + this.connecting = (String) data.get("connecting"); + this.receivingSummaries = parseSummaries((CompositeData[]) data.get("receivingSummaries")); + this.sendingSummaries = parseSummaries((CompositeData[]) data.get("sendingSummaries")); + this.state = (String) data.get("state"); + this.receivingFiles = parseFiles((CompositeData[]) data.get("receivingFiles")); + this.sendingFiles = parseFiles((CompositeData[]) data.get("sendingFiles")); + } + + private Collection<StreamSummary> parseSummaries(CompositeData[] summaries) + { + return Arrays.stream(summaries).map(StreamSummary::new).collect(Collectors.toList()); + } + + private Collection<ProgressInfo> parseFiles(CompositeData[] files) + { + return Arrays.stream(files).map(ProgressInfo::new).collect(Collectors.toList()); + } + + private long getTotalFiles(Collection<StreamSummary> summaries) + { + long total = 0; + for (StreamSummary summary : summaries) + total += summary.files; + return total; + } + + /** + * @return total size(in bytes) already received. + */ + public long getTotalSizeReceived() + { + return getTotalSizeInProgress(receivingFiles); + } + + /** + * @return total size(in bytes) already sent. + */ + public long getTotalSizeSent() + { + return getTotalSizeInProgress(sendingFiles); + } + + /** + * @return total number of files to receive in the session + */ + public long getTotalFilesToReceive() + { + return getTotalFiles(receivingSummaries); + } + + /** + * @return total number of files to send in the session + */ + public long getTotalFilesToSend() + { + return getTotalFiles(sendingSummaries); + } + + private long getTotalSizes(Collection<StreamSummary> summaries) + { + long total = 0; + for (StreamSummary summary : summaries) + total += summary.totalSize; + return total; + } + + /** + * @return total size(in bytes) to receive in the session + */ + public long getTotalSizeToReceive() + { + return getTotalSizes(receivingSummaries); + } + + private long getTotalSizeInProgress(Collection<ProgressInfo> streams) + { + long total = 0; + for (ProgressInfo stream : streams) + total += stream.currentBytes; + return total; + } + + /** + * @return total size(in bytes) to send in the session + */ + public long getTotalSizeToSend() + { + return getTotalSizes(sendingSummaries); + } + + private long getTotalFilesCompleted(Collection<ProgressInfo> files) + { + Iterable<ProgressInfo> completed = Iterables.filter(files, input -> input.isCompleted()); + return Iterables.size(completed); + } + + /** + * @return total number of files already received. + */ + public long getTotalFilesReceived() + { + return getTotalFilesCompleted(receivingFiles); + } + + /** + * @return total number of files already sent. + */ + public long getTotalFilesSent() + { + return getTotalFilesCompleted(sendingFiles); + } + + Review Comment: ```suggestion ``` ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java: ########## @@ -176,6 +177,15 @@ public MetricsOperations metricsOperations() return new CassandraMetricsOperations(cqlSessionProvider); } + /** + * {@inheritDoc} + */ + @Override + public StreamManagerOperations streamManagerOperations() Review Comment: I think this and `MetricsOperations` are essentially the same. We should fold stats into a single interface. Conceptually abstracts access to functionality behind Cassandra. The abstraction here is access to statistics from Cassandra. I think `MetricsOperations` does not really capture what we are trying to abstract, and adding another interface here doesn't make sense. I would avoid trying to have a one-to-one mapping from interface to C* bean here because in C*'s code base the functionality was exposed through a small set of beans. In recent contributions to the C* codebase I've seen that we are making a better effort to separate bean access into something that makes sense logically. For that reason we should really think about some abstraction that makes sense from a point of view of what we are trying to access, and maybe not focus too much on how things are exposed in Cassandra ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/StreamSummary.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base.data; + +import javax.management.openmbean.CompositeData; + +/** + * Representation of the stream summary data + */ +public class StreamSummary +{ + public final String tableId; + + /** + * Number of files to transfer. Can be 0 if nothing to transfer for some streaming request. + */ + public final int files; + public final long totalSize; + Review Comment: ```suggestion ``` ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/StreamState.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base.data; + +import java.util.Arrays; +import java.util.Collection; +import java.util.stream.Collectors; +import javax.management.openmbean.CompositeData; + +/** + * Representation of the stream state data + */ +public class StreamState +{ + Collection<SessionInfo> sessions; + + public StreamState(CompositeData data) + { + this.sessions = parseSessions((CompositeData[]) data.get("sessions")); + } + + private Collection<SessionInfo> parseSessions(CompositeData[] sessions) + { + return Arrays.stream(sessions).map(SessionInfo::new).collect(Collectors.toList()); Review Comment: also agree with Bernardo that streams should be used carefully. This is less readable to me than a good old for loop. ########## client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java: ########## @@ -1573,6 +1575,36 @@ public void onError(Throwable throwable) assertThat(new String(baos.toByteArray(), StandardCharsets.UTF_8)).isEqualTo("Test Content"); } + @Test + public void testStreamsStats() throws Exception + { + String streamStatsResponseAsString = "{\"operationMode\":\"NORMAL\"," + + "\"streamProgressStats\":{\"totalFilesToReceive\":7," + + "\"totalFilesReceived\":7,\"totalBytesToReceive\":15088," + + "\"totalBytesReceived\":15088,\"totalFilesToSend\":0,\"totalFilesSent\":0," + + "\"totalBytesToSend\":0,\"totalBytesSent\":0}}"; + + MockResponse response = new MockResponse().setResponseCode(OK.code()).setBody(streamStatsResponseAsString); + enqueue(response); + StreamStatsResponse result = client.streamsStats().get(); + + assertThat(result).isNotNull(); + assertThat(result.operationMode()).isNotNull().isEqualTo("NORMAL"); + StreamProgressStats progressStats = result.streamProgressStats(); + assertThat(progressStats).isNotNull(); + assertThat(progressStats.totalBytesReceived()).isNotNull(); Review Comment: no validation of the actual values? ########## client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java: ########## @@ -1573,6 +1575,36 @@ public void onError(Throwable throwable) assertThat(new String(baos.toByteArray(), StandardCharsets.UTF_8)).isEqualTo("Test Content"); } + @Test + public void testStreamsStats() throws Exception + { + String streamStatsResponseAsString = "{\"operationMode\":\"NORMAL\"," + + "\"streamProgressStats\":{\"totalFilesToReceive\":7," + + "\"totalFilesReceived\":7,\"totalBytesToReceive\":15088," + + "\"totalBytesReceived\":15088,\"totalFilesToSend\":0,\"totalFilesSent\":0," + + "\"totalBytesToSend\":0,\"totalBytesSent\":0}}"; Review Comment: maybe let's try with non-zero values, since 0 is the default ########## server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java: ########## @@ -102,4 +102,6 @@ default void outOfRangeDataCleanup(@NotNull String keyspace, @NotNull String tab { outOfRangeDataCleanup(keyspace, table, 1); } + + String getOperationMode(); Review Comment: should be `operationMode` and there are no javadocs here ```suggestion String operationMode(); ``` ########## server/src/main/java/org/apache/cassandra/sidecar/routes/StreamStatsHandler.java: ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import com.google.inject.Inject; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.common.response.StreamStatsResponse; +import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.server.StreamManagerOperations; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; + +import static org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable; + +/** + * Handler for retrieving node streams stats + */ +public class StreamStatsHandler extends AbstractHandler<Void> +{ + + /** + * Constructs a handler with the provided {@code metadataFetcher} + * + * @param metadataFetcher the metadata fetcher + * @param executorPools executor pools for blocking executions + */ + @Inject + protected StreamStatsHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools) + { + super(metadataFetcher, executorPools, null); + } + + /** + * {@inheritDoc} + */ + @Override + public void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + String host, + SocketAddress remoteAddress, + Void request) + { + + CassandraAdapterDelegate delegate = metadataFetcher.delegate(host); + if (delegate == null) + { + context.fail(cassandraServiceUnavailable()); + return; + } + + StorageOperations storageOperations = delegate.storageOperations(); + StreamManagerOperations streamMgrOperations = delegate.streamManagerOperations(); + if (storageOperations == null || streamMgrOperations == null) + { + context.fail(cassandraServiceUnavailable()); + return; + } Review Comment: this changed in https://issues.apache.org/jira/browse/CASSSIDECAR-182, and should be simplified to not do any null checks. ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/ProgressInfo.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base.data; + +import javax.management.openmbean.CompositeData; + +/** + * Representation of the stream progress info + */ +public class ProgressInfo +{ + Review Comment: ```suggestion ``` ########## server/src/test/java/org/apache/cassandra/sidecar/routes/StreamStatsHandlerTest.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import com.google.inject.util.Modules; +import io.vertx.core.Vertx; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.predicate.ResponsePredicate; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.apache.cassandra.sidecar.TestModule; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.cluster.InstancesConfig; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.response.StreamStatsResponse; +import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.server.StreamManagerOperations; +import org.apache.cassandra.sidecar.server.MainModule; +import org.apache.cassandra.sidecar.server.Server; +import org.mockito.stubbing.Answer; + +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link StreamStatsHandler} + */ +@ExtendWith(VertxExtension.class) +public class StreamStatsHandlerTest +{ + + static final Logger LOGGER = LoggerFactory.getLogger(StreamStatsHandlerTest.class); + Vertx vertx; + Server server; + + @BeforeEach + void before() throws InterruptedException + { + Module testOverride = Modules.override(new TestModule()) + .with(new StreamingStatsTestModule()); + Injector injector = Guice.createInjector(Modules.override(new MainModule()) + .with(testOverride)); + vertx = injector.getInstance(Vertx.class); + server = injector.getInstance(Server.class); + VertxTestContext context = new VertxTestContext(); + server.start() + .onSuccess(s -> context.completeNow()) + .onFailure(context::failNow); + context.awaitCompletion(5, TimeUnit.SECONDS); + } + + @AfterEach + void after() throws InterruptedException + { + CountDownLatch closeLatch = new CountDownLatch(1); + server.close().onSuccess(res -> closeLatch.countDown()); + if (closeLatch.await(60, TimeUnit.SECONDS)) + LOGGER.info("Close event received before timeout."); + else + LOGGER.error("Close event timed out."); + } + + @Test + void testStreamingStatsHandler(VertxTestContext context) + { + StreamingStatsTestModule.streamingStatsSupplier = () -> { + StreamStatsResponse response = new StreamStatsResponse("NORMAL", + new StreamProgressStats(7, 7, 1024, 1024, 0, 0, 0, 0)); + return response; + }; + + + WebClient client = WebClient.create(vertx); + String testRoute = "/api/v1/cassandra/stats/stream"; + client.get(server.actualPort(), "127.0.0.1", testRoute) + .expect(ResponsePredicate.SC_OK) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + StreamStatsResponse statsResponse = response.bodyAsJson(StreamStatsResponse.class); + assertThat(statsResponse).isNotNull(); + assertThat(statsResponse.operationMode()).isEqualTo("NORMAL"); + context.completeNow(); + })); + } + + @Test + void testStreamingStatsHandlerFailure(VertxTestContext context) + { + StreamingStatsTestModule.streamingStatsSupplier = () -> { + StreamStatsResponse response = new StreamStatsResponse("NORMAL", + new StreamProgressStats(7, 7, 1024, 1024, 0, 0, 0, 0)); + return response; + }; + + + WebClient client = WebClient.create(vertx); + String testRoute = "/api/v1/cassandra/stats/stream"; + client.get(server.actualPort(), "127.0.0.1", testRoute) + .expect(ResponsePredicate.SC_OK) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + StreamStatsResponse statsResponse = response.bodyAsJson(StreamStatsResponse.class); + assertThat(statsResponse).isNotNull(); + assertThat(statsResponse.operationMode()).isEqualTo("NORMAL"); + context.completeNow(); + })); + } + + + static class StreamingStatsTestModule extends AbstractModule + { + static Supplier<StreamStatsResponse> streamingStatsSupplier; Review Comment: hmm, why not directly inject this in the constructor? ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java: ########## @@ -146,6 +146,11 @@ public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... tables) return delegate.forceKeyspaceCleanup(jobs, keyspaceName, tables); } + public String getOperationMode() Review Comment: I see a few `@Override` annotations. Can you please add them where they are missing in your PR? ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/ProgressInfo.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base.data; + +import javax.management.openmbean.CompositeData; + +/** + * Representation of the stream progress info + */ +public class ProgressInfo +{ + + public final String peer; + public final int sessionIndex; + public final String fileName; + public final String direction; + public final long currentBytes; + public final long totalBytes; + + public ProgressInfo(CompositeData data) + { + + this.peer = (String) data.get("peer"); + this.sessionIndex = (int) data.get("sessionIndex"); + this.fileName = (String) data.get("fileName"); + this.direction = (String) data.get("direction"); + this.currentBytes = (long) data.get("currentBytes"); + this.totalBytes = (long) data.get("totalBytes"); + } + + /** + * @return true if transfer is completed + */ + public boolean isCompleted() + { + return currentBytes >= totalBytes; + } + Review Comment: ```suggestion ``` ########## server/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java: ########## @@ -410,6 +411,10 @@ public MetricsOperations metricsOperations() return fromAdapter(ICassandraAdapter::metricsOperations); } + public StreamManagerOperations streamManagerOperations() Review Comment: should go away ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/response/StreamStatsResponse.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.response; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats; + +/** + * Class response for the StreamStats API + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class StreamStatsResponse Review Comment: I think we need to understand why the operation mode is required for this endpoint. maybe elaborate somewhere why we want the operation mode as part of this response. ########## server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StreamManagerOperations.java: ########## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.server; + +import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats; + +/** + * An interface that defines interactions with the stream manager system in Cassandra. + */ +public interface StreamManagerOperations Review Comment: we should combine this into a `Statistics` interface that combines both the `MetricsOperations` and `StreamManagerOperations` interfaces ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base.data; + +import java.util.Arrays; +import java.util.Collection; +import java.util.stream.Collectors; +import javax.management.openmbean.CompositeData; + +import com.google.common.collect.Iterables; + +/** + * Representation of session info data + */ +public class SessionInfo +{ + public final String peer; + public final int sessionIndex; + public final String connecting; + /** Immutable collection of receiving summaries */ + public final Collection<StreamSummary> receivingSummaries; + /** Immutable collection of sending summaries*/ + public final Collection<StreamSummary> sendingSummaries; + /** Current session state */ + public final String state; + + public final Collection<ProgressInfo> receivingFiles; + public final Collection<ProgressInfo> sendingFiles; + + public SessionInfo(CompositeData data) + { + this.peer = (String) data.get("peer"); + this.sessionIndex = (int) data.get("sessionIndex"); + this.connecting = (String) data.get("connecting"); + this.receivingSummaries = parseSummaries((CompositeData[]) data.get("receivingSummaries")); + this.sendingSummaries = parseSummaries((CompositeData[]) data.get("sendingSummaries")); + this.state = (String) data.get("state"); + this.receivingFiles = parseFiles((CompositeData[]) data.get("receivingFiles")); + this.sendingFiles = parseFiles((CompositeData[]) data.get("sendingFiles")); + } + + private Collection<StreamSummary> parseSummaries(CompositeData[] summaries) Review Comment: these helper methods should go at the end. ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/response/StreamStatsResponse.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.response; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats; + +/** + * Class response for the StreamStats API + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class StreamStatsResponse +{ + @JsonProperty("operationMode") Review Comment: there are some formatting issues in this class. Also methods should go below the constructor. Please refer to the Code Structure -> Class Layout section in the Cassandra [code style](https://cassandra.apache.org/_/development/code_style.html) ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base.data; + +import java.util.Arrays; +import java.util.Collection; +import java.util.stream.Collectors; +import javax.management.openmbean.CompositeData; + +import com.google.common.collect.Iterables; + +/** + * Representation of session info data + */ +public class SessionInfo +{ + public final String peer; + public final int sessionIndex; + public final String connecting; + /** Immutable collection of receiving summaries */ + public final Collection<StreamSummary> receivingSummaries; + /** Immutable collection of sending summaries*/ + public final Collection<StreamSummary> sendingSummaries; + /** Current session state */ + public final String state; + + public final Collection<ProgressInfo> receivingFiles; + public final Collection<ProgressInfo> sendingFiles; + + public SessionInfo(CompositeData data) + { + this.peer = (String) data.get("peer"); + this.sessionIndex = (int) data.get("sessionIndex"); + this.connecting = (String) data.get("connecting"); + this.receivingSummaries = parseSummaries((CompositeData[]) data.get("receivingSummaries")); + this.sendingSummaries = parseSummaries((CompositeData[]) data.get("sendingSummaries")); + this.state = (String) data.get("state"); + this.receivingFiles = parseFiles((CompositeData[]) data.get("receivingFiles")); + this.sendingFiles = parseFiles((CompositeData[]) data.get("sendingFiles")); + } + + private Collection<StreamSummary> parseSummaries(CompositeData[] summaries) + { + return Arrays.stream(summaries).map(StreamSummary::new).collect(Collectors.toList()); + } + + private Collection<ProgressInfo> parseFiles(CompositeData[] files) + { + return Arrays.stream(files).map(ProgressInfo::new).collect(Collectors.toList()); + } + + private long getTotalFiles(Collection<StreamSummary> summaries) + { + long total = 0; + for (StreamSummary summary : summaries) + total += summary.files; + return total; + } + + /** + * @return total size(in bytes) already received. + */ + public long getTotalSizeReceived() Review Comment: ```suggestion public long totalSizeReceived() ``` ########## server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java: ########## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.jupiter.api.extension.ExtendWith; + +import com.datastax.driver.core.Session; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.dynamic.ClassFileLocator; +import net.bytebuddy.dynamic.TypeResolutionStrategy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import net.bytebuddy.pool.TypePool; +import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.IUpgradeableInstance; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.shared.Uninterruptibles; +import org.apache.cassandra.sidecar.common.response.StreamStatsResponse; +import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats; +import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName; +import org.apache.cassandra.sidecar.testing.CassandraSidecarTestContext; +import org.apache.cassandra.sidecar.testing.IntegrationTestBase; +import org.apache.cassandra.testing.CassandraIntegrationTest; +import org.apache.cassandra.testing.ConfigurableCassandraTestContext; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests the stream stats endpoint with cassandra container. + */ +@ExtendWith(VertxExtension.class) +public class StreamStatsIntegrationTest extends IntegrationTestBase +{ + @CassandraIntegrationTest(numDataDirsPerInstance = 4, nodesPerDc = 5, network = true, buildCluster = false) + void streamStatsTest(VertxTestContext context, ConfigurableCassandraTestContext cassandraTestContext) throws InterruptedException + { + + BBHelperDecommissioningNode.reset(); + UpgradeableCluster cluster = cassandraTestContext.configureAndStartCluster( + builder -> builder.withInstanceInitializer(BBHelperDecommissioningNode::install)); + IUpgradeableInstance node = cluster.get(5); + IUpgradeableInstance seed = cluster.get(1); + + createTestKeyspace(); + createTestTableAndPopulate(); + + startAsync("Decommission node" + node.config().num(), + () -> node.nodetoolResult("decommission").asserts().success()); + AtomicBoolean hasStats = new AtomicBoolean(false); + AtomicBoolean dataReceived = new AtomicBoolean(false); + + // Wait until nodes have reached expected state + awaitLatchOrThrow(BBHelperDecommissioningNode.transientStateStart, 2, TimeUnit.MINUTES, "transientStateStart"); + + ClusterUtils.awaitRingState(seed, node, "Leaving"); + BBHelperDecommissioningNode.transientStateEnd.countDown(); + + for (int i = 0; i < 20; i++) Review Comment: why do it 20 times? ########## server/src/main/java/org/apache/cassandra/sidecar/routes/AbstractHandler.java: ########## @@ -121,6 +122,19 @@ protected void ifMetricsOpsAvailable(RoutingContext context, ifAvailable.accept(operations); } + protected void ifStreamMgrOpsAvailable(RoutingContext context, + String host, + CassandraAdapterDelegate delegate, + Consumer<StreamManagerOperations> ifAvailable) + { + StreamManagerOperations operations = delegate.streamManagerOperations(); + if (operations == null) + { + context.fail(cassandraServiceUnavailable()); + return; + } + ifAvailable.accept(operations); + } Review Comment: this is no longer necessary after https://issues.apache.org/jira/browse/CASSSIDECAR-182, please remove ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base.data; + +import java.util.Arrays; +import java.util.Collection; +import java.util.stream.Collectors; +import javax.management.openmbean.CompositeData; + +import com.google.common.collect.Iterables; + +/** + * Representation of session info data + */ +public class SessionInfo +{ + public final String peer; + public final int sessionIndex; + public final String connecting; + /** Immutable collection of receiving summaries */ + public final Collection<StreamSummary> receivingSummaries; + /** Immutable collection of sending summaries*/ + public final Collection<StreamSummary> sendingSummaries; + /** Current session state */ + public final String state; + + public final Collection<ProgressInfo> receivingFiles; + public final Collection<ProgressInfo> sendingFiles; + + public SessionInfo(CompositeData data) + { + this.peer = (String) data.get("peer"); + this.sessionIndex = (int) data.get("sessionIndex"); + this.connecting = (String) data.get("connecting"); + this.receivingSummaries = parseSummaries((CompositeData[]) data.get("receivingSummaries")); + this.sendingSummaries = parseSummaries((CompositeData[]) data.get("sendingSummaries")); + this.state = (String) data.get("state"); + this.receivingFiles = parseFiles((CompositeData[]) data.get("receivingFiles")); + this.sendingFiles = parseFiles((CompositeData[]) data.get("sendingFiles")); + } + + private Collection<StreamSummary> parseSummaries(CompositeData[] summaries) + { + return Arrays.stream(summaries).map(StreamSummary::new).collect(Collectors.toList()); + } + + private Collection<ProgressInfo> parseFiles(CompositeData[] files) + { + return Arrays.stream(files).map(ProgressInfo::new).collect(Collectors.toList()); + } + + private long getTotalFiles(Collection<StreamSummary> summaries) + { + long total = 0; + for (StreamSummary summary : summaries) + total += summary.files; + return total; + } + + /** + * @return total size(in bytes) already received. + */ + public long getTotalSizeReceived() + { + return getTotalSizeInProgress(receivingFiles); + } + + /** + * @return total size(in bytes) already sent. + */ + public long getTotalSizeSent() Review Comment: ```suggestion public long totalSizeSent() ``` ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base.data; + +import java.util.Arrays; +import java.util.Collection; +import java.util.stream.Collectors; +import javax.management.openmbean.CompositeData; + +import com.google.common.collect.Iterables; + +/** + * Representation of session info data + */ +public class SessionInfo +{ + public final String peer; + public final int sessionIndex; + public final String connecting; + /** Immutable collection of receiving summaries */ + public final Collection<StreamSummary> receivingSummaries; + /** Immutable collection of sending summaries*/ + public final Collection<StreamSummary> sendingSummaries; + /** Current session state */ + public final String state; + + public final Collection<ProgressInfo> receivingFiles; + public final Collection<ProgressInfo> sendingFiles; + + public SessionInfo(CompositeData data) + { + this.peer = (String) data.get("peer"); + this.sessionIndex = (int) data.get("sessionIndex"); + this.connecting = (String) data.get("connecting"); + this.receivingSummaries = parseSummaries((CompositeData[]) data.get("receivingSummaries")); + this.sendingSummaries = parseSummaries((CompositeData[]) data.get("sendingSummaries")); + this.state = (String) data.get("state"); + this.receivingFiles = parseFiles((CompositeData[]) data.get("receivingFiles")); + this.sendingFiles = parseFiles((CompositeData[]) data.get("sendingFiles")); + } + + private Collection<StreamSummary> parseSummaries(CompositeData[] summaries) + { + return Arrays.stream(summaries).map(StreamSummary::new).collect(Collectors.toList()); + } + + private Collection<ProgressInfo> parseFiles(CompositeData[] files) + { + return Arrays.stream(files).map(ProgressInfo::new).collect(Collectors.toList()); + } + + private long getTotalFiles(Collection<StreamSummary> summaries) + { + long total = 0; + for (StreamSummary summary : summaries) + total += summary.files; + return total; + } + + /** + * @return total size(in bytes) already received. + */ + public long getTotalSizeReceived() + { + return getTotalSizeInProgress(receivingFiles); + } + + /** + * @return total size(in bytes) already sent. + */ + public long getTotalSizeSent() + { + return getTotalSizeInProgress(sendingFiles); + } + + /** + * @return total number of files to receive in the session + */ + public long getTotalFilesToReceive() + { + return getTotalFiles(receivingSummaries); + } + + /** + * @return total number of files to send in the session + */ + public long getTotalFilesToSend() + { + return getTotalFiles(sendingSummaries); + } + + private long getTotalSizes(Collection<StreamSummary> summaries) + { + long total = 0; + for (StreamSummary summary : summaries) + total += summary.totalSize; + return total; + } + + /** + * @return total size(in bytes) to receive in the session + */ + public long getTotalSizeToReceive() Review Comment: ```suggestion public long totalSizeToReceive() ``` ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base.data; + +import java.util.Arrays; +import java.util.Collection; +import java.util.stream.Collectors; +import javax.management.openmbean.CompositeData; + +import com.google.common.collect.Iterables; + +/** + * Representation of session info data + */ +public class SessionInfo +{ + public final String peer; + public final int sessionIndex; + public final String connecting; + /** Immutable collection of receiving summaries */ + public final Collection<StreamSummary> receivingSummaries; + /** Immutable collection of sending summaries*/ + public final Collection<StreamSummary> sendingSummaries; + /** Current session state */ + public final String state; + + public final Collection<ProgressInfo> receivingFiles; + public final Collection<ProgressInfo> sendingFiles; + + public SessionInfo(CompositeData data) + { + this.peer = (String) data.get("peer"); + this.sessionIndex = (int) data.get("sessionIndex"); + this.connecting = (String) data.get("connecting"); + this.receivingSummaries = parseSummaries((CompositeData[]) data.get("receivingSummaries")); + this.sendingSummaries = parseSummaries((CompositeData[]) data.get("sendingSummaries")); + this.state = (String) data.get("state"); + this.receivingFiles = parseFiles((CompositeData[]) data.get("receivingFiles")); + this.sendingFiles = parseFiles((CompositeData[]) data.get("sendingFiles")); + } + + private Collection<StreamSummary> parseSummaries(CompositeData[] summaries) + { + return Arrays.stream(summaries).map(StreamSummary::new).collect(Collectors.toList()); + } + + private Collection<ProgressInfo> parseFiles(CompositeData[] files) + { + return Arrays.stream(files).map(ProgressInfo::new).collect(Collectors.toList()); + } + + private long getTotalFiles(Collection<StreamSummary> summaries) + { + long total = 0; + for (StreamSummary summary : summaries) + total += summary.files; + return total; + } + + /** + * @return total size(in bytes) already received. + */ + public long getTotalSizeReceived() + { + return getTotalSizeInProgress(receivingFiles); + } + + /** + * @return total size(in bytes) already sent. + */ + public long getTotalSizeSent() + { + return getTotalSizeInProgress(sendingFiles); + } + + /** + * @return total number of files to receive in the session + */ + public long getTotalFilesToReceive() + { + return getTotalFiles(receivingSummaries); + } + + /** + * @return total number of files to send in the session + */ + public long getTotalFilesToSend() + { + return getTotalFiles(sendingSummaries); + } + + private long getTotalSizes(Collection<StreamSummary> summaries) + { + long total = 0; + for (StreamSummary summary : summaries) + total += summary.totalSize; + return total; + } + + /** + * @return total size(in bytes) to receive in the session + */ + public long getTotalSizeToReceive() + { + return getTotalSizes(receivingSummaries); + } + + private long getTotalSizeInProgress(Collection<ProgressInfo> streams) + { + long total = 0; + for (ProgressInfo stream : streams) + total += stream.currentBytes; + return total; + } + + /** + * @return total size(in bytes) to send in the session + */ + public long getTotalSizeToSend() Review Comment: ```suggestion public long totalSizeToSend() ``` ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base.data; + +import java.util.Arrays; +import java.util.Collection; +import java.util.stream.Collectors; +import javax.management.openmbean.CompositeData; + +import com.google.common.collect.Iterables; + +/** + * Representation of session info data + */ +public class SessionInfo +{ + public final String peer; + public final int sessionIndex; + public final String connecting; + /** Immutable collection of receiving summaries */ + public final Collection<StreamSummary> receivingSummaries; + /** Immutable collection of sending summaries*/ + public final Collection<StreamSummary> sendingSummaries; + /** Current session state */ + public final String state; + + public final Collection<ProgressInfo> receivingFiles; + public final Collection<ProgressInfo> sendingFiles; + + public SessionInfo(CompositeData data) + { + this.peer = (String) data.get("peer"); + this.sessionIndex = (int) data.get("sessionIndex"); + this.connecting = (String) data.get("connecting"); + this.receivingSummaries = parseSummaries((CompositeData[]) data.get("receivingSummaries")); + this.sendingSummaries = parseSummaries((CompositeData[]) data.get("sendingSummaries")); + this.state = (String) data.get("state"); + this.receivingFiles = parseFiles((CompositeData[]) data.get("receivingFiles")); + this.sendingFiles = parseFiles((CompositeData[]) data.get("sendingFiles")); + } + + private Collection<StreamSummary> parseSummaries(CompositeData[] summaries) + { + return Arrays.stream(summaries).map(StreamSummary::new).collect(Collectors.toList()); + } + + private Collection<ProgressInfo> parseFiles(CompositeData[] files) + { + return Arrays.stream(files).map(ProgressInfo::new).collect(Collectors.toList()); + } + + private long getTotalFiles(Collection<StreamSummary> summaries) + { + long total = 0; + for (StreamSummary summary : summaries) + total += summary.files; + return total; + } + + /** + * @return total size(in bytes) already received. + */ + public long getTotalSizeReceived() + { + return getTotalSizeInProgress(receivingFiles); + } + + /** + * @return total size(in bytes) already sent. + */ + public long getTotalSizeSent() + { + return getTotalSizeInProgress(sendingFiles); + } + + /** + * @return total number of files to receive in the session + */ + public long getTotalFilesToReceive() Review Comment: ```suggestion public long totalFilesToReceive() ``` ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base.data; + +import java.util.Arrays; +import java.util.Collection; +import java.util.stream.Collectors; +import javax.management.openmbean.CompositeData; + +import com.google.common.collect.Iterables; + +/** + * Representation of session info data + */ +public class SessionInfo +{ + public final String peer; + public final int sessionIndex; + public final String connecting; + /** Immutable collection of receiving summaries */ + public final Collection<StreamSummary> receivingSummaries; + /** Immutable collection of sending summaries*/ + public final Collection<StreamSummary> sendingSummaries; + /** Current session state */ + public final String state; + + public final Collection<ProgressInfo> receivingFiles; + public final Collection<ProgressInfo> sendingFiles; + + public SessionInfo(CompositeData data) + { + this.peer = (String) data.get("peer"); + this.sessionIndex = (int) data.get("sessionIndex"); + this.connecting = (String) data.get("connecting"); + this.receivingSummaries = parseSummaries((CompositeData[]) data.get("receivingSummaries")); + this.sendingSummaries = parseSummaries((CompositeData[]) data.get("sendingSummaries")); + this.state = (String) data.get("state"); + this.receivingFiles = parseFiles((CompositeData[]) data.get("receivingFiles")); + this.sendingFiles = parseFiles((CompositeData[]) data.get("sendingFiles")); + } + + private Collection<StreamSummary> parseSummaries(CompositeData[] summaries) + { + return Arrays.stream(summaries).map(StreamSummary::new).collect(Collectors.toList()); + } + + private Collection<ProgressInfo> parseFiles(CompositeData[] files) + { + return Arrays.stream(files).map(ProgressInfo::new).collect(Collectors.toList()); + } + + private long getTotalFiles(Collection<StreamSummary> summaries) + { + long total = 0; + for (StreamSummary summary : summaries) + total += summary.files; + return total; + } + + /** + * @return total size(in bytes) already received. + */ + public long getTotalSizeReceived() + { + return getTotalSizeInProgress(receivingFiles); + } + + /** + * @return total size(in bytes) already sent. + */ + public long getTotalSizeSent() + { + return getTotalSizeInProgress(sendingFiles); + } + + /** + * @return total number of files to receive in the session + */ + public long getTotalFilesToReceive() + { + return getTotalFiles(receivingSummaries); + } + + /** + * @return total number of files to send in the session + */ + public long getTotalFilesToSend() + { + return getTotalFiles(sendingSummaries); + } + + private long getTotalSizes(Collection<StreamSummary> summaries) + { + long total = 0; + for (StreamSummary summary : summaries) + total += summary.totalSize; + return total; + } + + /** + * @return total size(in bytes) to receive in the session + */ + public long getTotalSizeToReceive() + { + return getTotalSizes(receivingSummaries); + } + + private long getTotalSizeInProgress(Collection<ProgressInfo> streams) + { + long total = 0; + for (ProgressInfo stream : streams) + total += stream.currentBytes; + return total; + } + + /** + * @return total size(in bytes) to send in the session + */ + public long getTotalSizeToSend() + { + return getTotalSizes(sendingSummaries); + } + + private long getTotalFilesCompleted(Collection<ProgressInfo> files) + { + Iterable<ProgressInfo> completed = Iterables.filter(files, input -> input.isCompleted()); + return Iterables.size(completed); + } + + /** + * @return total number of files already received. + */ + public long getTotalFilesReceived() + { + return getTotalFilesCompleted(receivingFiles); + } + + /** + * @return total number of files already sent. + */ + public long getTotalFilesSent() Review Comment: ```suggestion public long totalFilesSent() ``` ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base.data; + +import java.util.Arrays; +import java.util.Collection; +import java.util.stream.Collectors; +import javax.management.openmbean.CompositeData; + +import com.google.common.collect.Iterables; + +/** + * Representation of session info data + */ +public class SessionInfo +{ + public final String peer; + public final int sessionIndex; + public final String connecting; + /** Immutable collection of receiving summaries */ + public final Collection<StreamSummary> receivingSummaries; + /** Immutable collection of sending summaries*/ + public final Collection<StreamSummary> sendingSummaries; + /** Current session state */ + public final String state; + + public final Collection<ProgressInfo> receivingFiles; + public final Collection<ProgressInfo> sendingFiles; + + public SessionInfo(CompositeData data) + { + this.peer = (String) data.get("peer"); + this.sessionIndex = (int) data.get("sessionIndex"); + this.connecting = (String) data.get("connecting"); + this.receivingSummaries = parseSummaries((CompositeData[]) data.get("receivingSummaries")); + this.sendingSummaries = parseSummaries((CompositeData[]) data.get("sendingSummaries")); + this.state = (String) data.get("state"); + this.receivingFiles = parseFiles((CompositeData[]) data.get("receivingFiles")); + this.sendingFiles = parseFiles((CompositeData[]) data.get("sendingFiles")); + } + + private Collection<StreamSummary> parseSummaries(CompositeData[] summaries) + { + return Arrays.stream(summaries).map(StreamSummary::new).collect(Collectors.toList()); + } + + private Collection<ProgressInfo> parseFiles(CompositeData[] files) + { + return Arrays.stream(files).map(ProgressInfo::new).collect(Collectors.toList()); + } + + private long getTotalFiles(Collection<StreamSummary> summaries) + { + long total = 0; + for (StreamSummary summary : summaries) + total += summary.files; + return total; + } + + /** + * @return total size(in bytes) already received. + */ + public long getTotalSizeReceived() + { + return getTotalSizeInProgress(receivingFiles); + } + + /** + * @return total size(in bytes) already sent. + */ + public long getTotalSizeSent() + { + return getTotalSizeInProgress(sendingFiles); + } + + /** + * @return total number of files to receive in the session + */ + public long getTotalFilesToReceive() + { + return getTotalFiles(receivingSummaries); + } + + /** + * @return total number of files to send in the session + */ + public long getTotalFilesToSend() + { + return getTotalFiles(sendingSummaries); + } + + private long getTotalSizes(Collection<StreamSummary> summaries) + { + long total = 0; + for (StreamSummary summary : summaries) + total += summary.totalSize; + return total; + } + + /** + * @return total size(in bytes) to receive in the session + */ + public long getTotalSizeToReceive() + { + return getTotalSizes(receivingSummaries); + } + + private long getTotalSizeInProgress(Collection<ProgressInfo> streams) + { + long total = 0; + for (ProgressInfo stream : streams) + total += stream.currentBytes; + return total; + } + + /** + * @return total size(in bytes) to send in the session + */ + public long getTotalSizeToSend() + { + return getTotalSizes(sendingSummaries); + } + + private long getTotalFilesCompleted(Collection<ProgressInfo> files) + { + Iterable<ProgressInfo> completed = Iterables.filter(files, input -> input.isCompleted()); + return Iterables.size(completed); + } + + /** + * @return total number of files already received. + */ + public long getTotalFilesReceived() Review Comment: ```suggestion public long totalFilesReceived() ``` ########## server/src/main/java/org/apache/cassandra/sidecar/routes/StreamStatsHandler.java: ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import com.google.inject.Inject; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.common.response.StreamStatsResponse; +import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.server.StreamManagerOperations; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; + +import static org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable; + +/** + * Handler for retrieving node streams stats + */ +public class StreamStatsHandler extends AbstractHandler<Void> +{ + + /** + * Constructs a handler with the provided {@code metadataFetcher} + * + * @param metadataFetcher the metadata fetcher + * @param executorPools executor pools for blocking executions + */ + @Inject + protected StreamStatsHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools) + { + super(metadataFetcher, executorPools, null); + } + + /** + * {@inheritDoc} + */ + @Override + public void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + String host, + SocketAddress remoteAddress, + Void request) + { + + CassandraAdapterDelegate delegate = metadataFetcher.delegate(host); + if (delegate == null) + { + context.fail(cassandraServiceUnavailable()); + return; + } + + StorageOperations storageOperations = delegate.storageOperations(); + StreamManagerOperations streamMgrOperations = delegate.streamManagerOperations(); + if (storageOperations == null || streamMgrOperations == null) + { + context.fail(cassandraServiceUnavailable()); + return; + } + + executorPools.service() + .executeBlocking(() -> { + String mode = storageOperations.getOperationMode(); + StreamProgressStats stats = streamMgrOperations.getStreamProgressStats(); + return new StreamStatsResponse(mode, stats); + }) + .onSuccess(context::json) + .onFailure(cause -> processFailure(cause, context, host, remoteAddress, request)); + + } + + protected Void extractParamsOrThrow(RoutingContext context) Review Comment: let's add the `@Override` annotation here. ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base.data; + +import java.util.Arrays; +import java.util.Collection; +import java.util.stream.Collectors; +import javax.management.openmbean.CompositeData; + +import com.google.common.collect.Iterables; + +/** + * Representation of session info data + */ +public class SessionInfo +{ + public final String peer; + public final int sessionIndex; + public final String connecting; + /** Immutable collection of receiving summaries */ + public final Collection<StreamSummary> receivingSummaries; + /** Immutable collection of sending summaries*/ + public final Collection<StreamSummary> sendingSummaries; + /** Current session state */ + public final String state; + + public final Collection<ProgressInfo> receivingFiles; + public final Collection<ProgressInfo> sendingFiles; + + public SessionInfo(CompositeData data) + { + this.peer = (String) data.get("peer"); + this.sessionIndex = (int) data.get("sessionIndex"); + this.connecting = (String) data.get("connecting"); + this.receivingSummaries = parseSummaries((CompositeData[]) data.get("receivingSummaries")); + this.sendingSummaries = parseSummaries((CompositeData[]) data.get("sendingSummaries")); + this.state = (String) data.get("state"); + this.receivingFiles = parseFiles((CompositeData[]) data.get("receivingFiles")); + this.sendingFiles = parseFiles((CompositeData[]) data.get("sendingFiles")); + } + + private Collection<StreamSummary> parseSummaries(CompositeData[] summaries) + { + return Arrays.stream(summaries).map(StreamSummary::new).collect(Collectors.toList()); + } + + private Collection<ProgressInfo> parseFiles(CompositeData[] files) + { + return Arrays.stream(files).map(ProgressInfo::new).collect(Collectors.toList()); + } + + private long getTotalFiles(Collection<StreamSummary> summaries) + { + long total = 0; + for (StreamSummary summary : summaries) + total += summary.files; + return total; + } + + /** + * @return total size(in bytes) already received. + */ + public long getTotalSizeReceived() + { + return getTotalSizeInProgress(receivingFiles); + } + + /** + * @return total size(in bytes) already sent. + */ + public long getTotalSizeSent() + { + return getTotalSizeInProgress(sendingFiles); + } + + /** + * @return total number of files to receive in the session + */ + public long getTotalFilesToReceive() + { + return getTotalFiles(receivingSummaries); + } + + /** + * @return total number of files to send in the session + */ + public long getTotalFilesToSend() Review Comment: ```suggestion public long totalFilesToSend() ``` ########## client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java: ########## @@ -664,6 +665,18 @@ public CompletableFuture<ListOperationalJobsResponse> listOperationalJobs(Sideca .build()); } + /** + * Executes the stream stats request using the default retry policy and configured selection policy + * + * @return a completable future of the connected client stats + */ + public CompletableFuture<StreamStatsResponse> streamsStats() Review Comment: this request should be directed at a single instance I assume? ########## server/src/main/java/org/apache/cassandra/sidecar/routes/StreamStatsHandler.java: ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import com.google.inject.Inject; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.common.response.StreamStatsResponse; +import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.server.StreamManagerOperations; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; + +import static org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable; + +/** + * Handler for retrieving node streams stats + */ +public class StreamStatsHandler extends AbstractHandler<Void> +{ + Review Comment: ```suggestion ``` ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java: ########## @@ -125,6 +125,7 @@ public final class ApiEndpointsV1 public static final String PER_OPERATIONAL_JOB = OPERATIONAL_JOBS + '/' + OPERATIONAL_JOB_ID_PATH_PARAM; public static final String LIST_OPERATIONAL_JOBS_ROUTE = API_V1 + CASSANDRA + OPERATIONAL_JOBS; public static final String OPERATIONAL_JOB_ROUTE = API_V1 + CASSANDRA + PER_OPERATIONAL_JOB; + public static final String STREAM_STATS_ROUTE = API_V1 + CASSANDRA + "/stats/stream"; Review Comment: Should be plural here if you are retrieving all the streams ```suggestion public static final String STREAM_STATS_ROUTE = API_V1 + CASSANDRA + "/stats/streams"; ``` ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/StreamSummary.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base.data; + +import javax.management.openmbean.CompositeData; + +/** + * Representation of the stream summary data + */ +public class StreamSummary +{ + public final String tableId; + + /** + * Number of files to transfer. Can be 0 if nothing to transfer for some streaming request. + */ + public final int files; + public final long totalSize; + + + public StreamSummary(String tableId, int files, long totalSize) + { + this.tableId = tableId; + this.files = files; + this.totalSize = totalSize; + } Review Comment: this constructor is unused. do we need it? ########## client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java: ########## @@ -664,6 +665,18 @@ public CompletableFuture<ListOperationalJobsResponse> listOperationalJobs(Sideca .build()); } + /** + * Executes the stream stats request using the default retry policy and configured selection policy Review Comment: ```suggestion * Executes the streams stats request using the default retry policy and configured selection policy ``` ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java: ########## @@ -159,4 +159,10 @@ public interface StorageJmxOperations * @throws InterruptedException it does not really throw but declared in MBean */ int forceKeyspaceCleanup(int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException; + + /** + * Fetch the operation-mode of the node + * @return string representation of theoperation-mode Review Comment: ```suggestion * @return string representation of the operation-mode ``` ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/StreamState.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base.data; + +import java.util.Arrays; +import java.util.Collection; +import java.util.stream.Collectors; +import javax.management.openmbean.CompositeData; + +/** + * Representation of the stream state data + */ +public class StreamState +{ + Collection<SessionInfo> sessions; + + public StreamState(CompositeData data) + { + this.sessions = parseSessions((CompositeData[]) data.get("sessions")); + } + + private Collection<SessionInfo> parseSessions(CompositeData[] sessions) Review Comment: let's move helper methods to the end of the class ########## client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java: ########## @@ -1573,6 +1575,36 @@ public void onError(Throwable throwable) assertThat(new String(baos.toByteArray(), StandardCharsets.UTF_8)).isEqualTo("Test Content"); } + @Test + public void testStreamsStats() throws Exception + { + String streamStatsResponseAsString = "{\"operationMode\":\"NORMAL\"," + + "\"streamProgressStats\":{\"totalFilesToReceive\":7," + Review Comment: ```suggestion "\"streamsProgressStats\":{\"totalFilesToReceive\":7," + ``` ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStreamManagerOperations.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base; + +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import javax.management.openmbean.CompositeData; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.sidecar.adapters.base.data.SessionInfo; +import org.apache.cassandra.sidecar.adapters.base.data.StreamState; +import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats; +import org.apache.cassandra.sidecar.common.server.JmxClient; +import org.apache.cassandra.sidecar.common.server.StreamManagerOperations; +import org.apache.cassandra.sidecar.common.server.dns.DnsResolver; + +import static org.apache.cassandra.sidecar.adapters.base.StreamManagerJmxOperations.STREAM_MANAGER_OBJ_NAME; + +/** + * An implementation of the {@link StreamManagerOperations} that interfaces with Cassandra 4.0 and later + */ +public class CassandraStreamManagerOperations implements StreamManagerOperations +{ + + private static final Logger LOGGER = LoggerFactory.getLogger(CassandraStorageOperations.class); + protected final JmxClient jmxClient; + + /** + * Creates a new instance with the provided {@link JmxClient} and {@link DnsResolver} + * + * @param jmxClient the JMX client used to communicate with the Cassandra instance + * @param dnsResolver the DNS resolver used to lookup replicas + */ + public CassandraStreamManagerOperations(JmxClient jmxClient, DnsResolver dnsResolver) + { + this.jmxClient = jmxClient; + } + + public StreamProgressStats getStreamProgressStats() + { + Set<CompositeData> streamData = jmxClient.proxy(StreamManagerJmxOperations.class, STREAM_MANAGER_OBJ_NAME) + .getCurrentStreams(); + + List<StreamState> streamStates = streamData.stream().map(StreamState::new).collect(Collectors.toList()); + return computeStats(streamStates); + } + + private StreamProgressStats computeStats(List<StreamState> streamStates) + { + List<SessionInfo> sessions = streamStates.stream().map(s -> s.sessions()).flatMap(Collection::stream).collect(Collectors.toList()); + + long totalFilesToReceive = 0; + long totalFilesReceived = 0; + long totalBytesToReceive = 0; + long totalBytesReceived = 0; + + long totalFilesToSend = 0; + long totalFilesSent = 0; + long totalBytesToSend = 0; + long totalBytesSent = 0; + + for (SessionInfo s : sessions) + { + totalBytesToReceive += s.getTotalSizeToReceive(); + totalBytesReceived += s.getTotalSizeReceived(); + totalFilesToReceive += s.getTotalFilesToReceive(); + totalFilesReceived += s.getTotalFilesReceived(); + totalBytesToSend += s.getTotalSizeToSend(); + totalBytesSent += s.getTotalSizeSent(); + totalFilesToSend += s.getTotalFilesToSend(); + totalFilesSent += s.getTotalFilesSent(); + + } + LOGGER.info("Progress Stats: {}, {}, {}, {}", totalBytesToReceive, totalBytesReceived, totalBytesToSend, totalBytesSent); Review Comment: hmm, should we move this to debug instead? ########## server/src/test/java/org/apache/cassandra/sidecar/routes/StreamStatsHandlerTest.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import com.google.inject.util.Modules; +import io.vertx.core.Vertx; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.predicate.ResponsePredicate; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.apache.cassandra.sidecar.TestModule; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.cluster.InstancesConfig; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.response.StreamStatsResponse; +import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.server.StreamManagerOperations; +import org.apache.cassandra.sidecar.server.MainModule; +import org.apache.cassandra.sidecar.server.Server; +import org.mockito.stubbing.Answer; + +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link StreamStatsHandler} + */ +@ExtendWith(VertxExtension.class) +public class StreamStatsHandlerTest +{ + + static final Logger LOGGER = LoggerFactory.getLogger(StreamStatsHandlerTest.class); + Vertx vertx; + Server server; + + @BeforeEach + void before() throws InterruptedException + { + Module testOverride = Modules.override(new TestModule()) + .with(new StreamingStatsTestModule()); + Injector injector = Guice.createInjector(Modules.override(new MainModule()) + .with(testOverride)); + vertx = injector.getInstance(Vertx.class); + server = injector.getInstance(Server.class); + VertxTestContext context = new VertxTestContext(); + server.start() + .onSuccess(s -> context.completeNow()) + .onFailure(context::failNow); + context.awaitCompletion(5, TimeUnit.SECONDS); + } + + @AfterEach + void after() throws InterruptedException + { + CountDownLatch closeLatch = new CountDownLatch(1); + server.close().onSuccess(res -> closeLatch.countDown()); + if (closeLatch.await(60, TimeUnit.SECONDS)) + LOGGER.info("Close event received before timeout."); + else + LOGGER.error("Close event timed out."); + } + + @Test + void testStreamingStatsHandler(VertxTestContext context) + { + StreamingStatsTestModule.streamingStatsSupplier = () -> { + StreamStatsResponse response = new StreamStatsResponse("NORMAL", + new StreamProgressStats(7, 7, 1024, 1024, 0, 0, 0, 0)); + return response; + }; + + + WebClient client = WebClient.create(vertx); + String testRoute = "/api/v1/cassandra/stats/stream"; + client.get(server.actualPort(), "127.0.0.1", testRoute) + .expect(ResponsePredicate.SC_OK) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + StreamStatsResponse statsResponse = response.bodyAsJson(StreamStatsResponse.class); + assertThat(statsResponse).isNotNull(); + assertThat(statsResponse.operationMode()).isEqualTo("NORMAL"); + context.completeNow(); + })); + } + + @Test + void testStreamingStatsHandlerFailure(VertxTestContext context) Review Comment: this test is exactly the same as the `testStreamingStatsHandler` test. Are we really testing failure here? ########## server/src/test/java/org/apache/cassandra/sidecar/routes/StreamStatsHandlerTest.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import com.google.inject.util.Modules; +import io.vertx.core.Vertx; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.predicate.ResponsePredicate; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.apache.cassandra.sidecar.TestModule; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.cluster.InstancesConfig; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.response.StreamStatsResponse; +import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.server.StreamManagerOperations; +import org.apache.cassandra.sidecar.server.MainModule; +import org.apache.cassandra.sidecar.server.Server; +import org.mockito.stubbing.Answer; + +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link StreamStatsHandler} + */ +@ExtendWith(VertxExtension.class) +public class StreamStatsHandlerTest +{ + + static final Logger LOGGER = LoggerFactory.getLogger(StreamStatsHandlerTest.class); + Vertx vertx; + Server server; + + @BeforeEach + void before() throws InterruptedException + { + Module testOverride = Modules.override(new TestModule()) + .with(new StreamingStatsTestModule()); + Injector injector = Guice.createInjector(Modules.override(new MainModule()) + .with(testOverride)); + vertx = injector.getInstance(Vertx.class); + server = injector.getInstance(Server.class); + VertxTestContext context = new VertxTestContext(); + server.start() + .onSuccess(s -> context.completeNow()) + .onFailure(context::failNow); + context.awaitCompletion(5, TimeUnit.SECONDS); + } + + @AfterEach + void after() throws InterruptedException + { + CountDownLatch closeLatch = new CountDownLatch(1); + server.close().onSuccess(res -> closeLatch.countDown()); + if (closeLatch.await(60, TimeUnit.SECONDS)) + LOGGER.info("Close event received before timeout."); + else + LOGGER.error("Close event timed out."); + } + + @Test + void testStreamingStatsHandler(VertxTestContext context) + { + StreamingStatsTestModule.streamingStatsSupplier = () -> { + StreamStatsResponse response = new StreamStatsResponse("NORMAL", + new StreamProgressStats(7, 7, 1024, 1024, 0, 0, 0, 0)); + return response; + }; + + + WebClient client = WebClient.create(vertx); + String testRoute = "/api/v1/cassandra/stats/stream"; + client.get(server.actualPort(), "127.0.0.1", testRoute) + .expect(ResponsePredicate.SC_OK) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + StreamStatsResponse statsResponse = response.bodyAsJson(StreamStatsResponse.class); + assertThat(statsResponse).isNotNull(); + assertThat(statsResponse.operationMode()).isEqualTo("NORMAL"); + context.completeNow(); + })); + } + + @Test + void testStreamingStatsHandlerFailure(VertxTestContext context) + { + StreamingStatsTestModule.streamingStatsSupplier = () -> { + StreamStatsResponse response = new StreamStatsResponse("NORMAL", + new StreamProgressStats(7, 7, 1024, 1024, 0, 0, 0, 0)); + return response; + }; + + + WebClient client = WebClient.create(vertx); + String testRoute = "/api/v1/cassandra/stats/stream"; + client.get(server.actualPort(), "127.0.0.1", testRoute) + .expect(ResponsePredicate.SC_OK) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + StreamStatsResponse statsResponse = response.bodyAsJson(StreamStatsResponse.class); + assertThat(statsResponse).isNotNull(); + assertThat(statsResponse.operationMode()).isEqualTo("NORMAL"); + context.completeNow(); + })); + } + + + static class StreamingStatsTestModule extends AbstractModule + { + static Supplier<StreamStatsResponse> streamingStatsSupplier; + + @Provides + @Singleton + public InstancesConfig instanceConfig() throws IOException Review Comment: ```suggestion public InstancesConfig instanceConfig() ``` ########## server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java: ########## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.jupiter.api.extension.ExtendWith; + +import com.datastax.driver.core.Session; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.dynamic.ClassFileLocator; +import net.bytebuddy.dynamic.TypeResolutionStrategy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import net.bytebuddy.pool.TypePool; +import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.IUpgradeableInstance; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.shared.Uninterruptibles; +import org.apache.cassandra.sidecar.common.response.StreamStatsResponse; +import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats; +import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName; +import org.apache.cassandra.sidecar.testing.CassandraSidecarTestContext; +import org.apache.cassandra.sidecar.testing.IntegrationTestBase; +import org.apache.cassandra.testing.CassandraIntegrationTest; +import org.apache.cassandra.testing.ConfigurableCassandraTestContext; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests the stream stats endpoint with cassandra container. + */ +@ExtendWith(VertxExtension.class) +public class StreamStatsIntegrationTest extends IntegrationTestBase +{ + @CassandraIntegrationTest(numDataDirsPerInstance = 4, nodesPerDc = 5, network = true, buildCluster = false) + void streamStatsTest(VertxTestContext context, ConfigurableCassandraTestContext cassandraTestContext) throws InterruptedException + { + + BBHelperDecommissioningNode.reset(); + UpgradeableCluster cluster = cassandraTestContext.configureAndStartCluster( + builder -> builder.withInstanceInitializer(BBHelperDecommissioningNode::install)); + IUpgradeableInstance node = cluster.get(5); + IUpgradeableInstance seed = cluster.get(1); + + createTestKeyspace(); + createTestTableAndPopulate(); + + startAsync("Decommission node" + node.config().num(), + () -> node.nodetoolResult("decommission").asserts().success()); + AtomicBoolean hasStats = new AtomicBoolean(false); + AtomicBoolean dataReceived = new AtomicBoolean(false); + + // Wait until nodes have reached expected state + awaitLatchOrThrow(BBHelperDecommissioningNode.transientStateStart, 2, TimeUnit.MINUTES, "transientStateStart"); + + ClusterUtils.awaitRingState(seed, node, "Leaving"); + BBHelperDecommissioningNode.transientStateEnd.countDown(); + + for (int i = 0; i < 20; i++) + { + startAsync("Request-" + i , () -> { + try + { + streamStats(context, hasStats, dataReceived); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }); + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } + + assertThat(hasStats).isTrue(); + assertThat(dataReceived).isTrue(); + context.completeNow(); + context.awaitCompletion(2, TimeUnit.MINUTES); + } + + private void streamStats(VertxTestContext context, AtomicBoolean hasStats, AtomicBoolean dataReceived) throws Exception + { + String testRoute = "/api/v1/cassandra/stats/stream"; + testWithClient(context, client -> { + BBHelperDecommissioningNode.transientStateEnd.countDown(); + client.get(server.actualPort(), "127.0.0.1", testRoute) + .send(context.succeeding(response -> { + assertRingResponseOK(response, sidecarTestContext, hasStats, dataReceived); + })); + }); + } + + void assertRingResponseOK(HttpResponse<Buffer> response, CassandraSidecarTestContext cassandraTestContext, Review Comment: this is not really a ring response? Also cassandraTestContext is unused ```suggestion void assertRingResponseOK(HttpResponse<Buffer> response, ``` ########## spotbugs-exclude.xml: ########## @@ -62,7 +62,10 @@ <Match> <Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD" /> - <Class name="org.apache.cassandra.sidecar.routes.RingHandlerTest" /> + <Or> + <Class name="org.apache.cassandra.sidecar.routes.RingHandlerTest" /> + <Class name="org.apache.cassandra.sidecar.routes.StreamStatsHandlerTest" /> Review Comment: let's not statically set the fields ########## server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java: ########## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.jupiter.api.extension.ExtendWith; + +import com.datastax.driver.core.Session; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.dynamic.ClassFileLocator; +import net.bytebuddy.dynamic.TypeResolutionStrategy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import net.bytebuddy.pool.TypePool; +import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.IUpgradeableInstance; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.shared.Uninterruptibles; +import org.apache.cassandra.sidecar.common.response.StreamStatsResponse; +import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats; +import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName; +import org.apache.cassandra.sidecar.testing.CassandraSidecarTestContext; +import org.apache.cassandra.sidecar.testing.IntegrationTestBase; +import org.apache.cassandra.testing.CassandraIntegrationTest; +import org.apache.cassandra.testing.ConfigurableCassandraTestContext; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests the stream stats endpoint with cassandra container. + */ +@ExtendWith(VertxExtension.class) +public class StreamStatsIntegrationTest extends IntegrationTestBase +{ + @CassandraIntegrationTest(numDataDirsPerInstance = 4, nodesPerDc = 5, network = true, buildCluster = false) + void streamStatsTest(VertxTestContext context, ConfigurableCassandraTestContext cassandraTestContext) throws InterruptedException + { + Review Comment: ```suggestion ``` ########## server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java: ########## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.jupiter.api.extension.ExtendWith; + +import com.datastax.driver.core.Session; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.dynamic.ClassFileLocator; +import net.bytebuddy.dynamic.TypeResolutionStrategy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import net.bytebuddy.pool.TypePool; +import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.IUpgradeableInstance; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.shared.Uninterruptibles; +import org.apache.cassandra.sidecar.common.response.StreamStatsResponse; +import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats; +import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName; +import org.apache.cassandra.sidecar.testing.CassandraSidecarTestContext; +import org.apache.cassandra.sidecar.testing.IntegrationTestBase; +import org.apache.cassandra.testing.CassandraIntegrationTest; +import org.apache.cassandra.testing.ConfigurableCassandraTestContext; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests the stream stats endpoint with cassandra container. + */ +@ExtendWith(VertxExtension.class) +public class StreamStatsIntegrationTest extends IntegrationTestBase +{ + @CassandraIntegrationTest(numDataDirsPerInstance = 4, nodesPerDc = 5, network = true, buildCluster = false) Review Comment: do we need 5 nodes? We are trying to save time on test runtime and carefully choosing the cluster size is a goal we have to reduce the runtime and resources needed to run tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

