bbotella commented on code in PR #166:
URL: https://github.com/apache/cassandra-sidecar/pull/166#discussion_r1897997868


##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/ProgressInfo.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.data;
+
+import javax.management.openmbean.CompositeData;
+
+/**
+ * Representation of the stream progress info
+ */
+public class ProgressInfo
+{
+
+    public final String peer;
+    public final int sessionIndex;
+    public final String fileName;
+    public final String direction;
+    public final long currentBytes;
+    public final long totalBytes;
+
+    public ProgressInfo(CompositeData data)
+    {
+

Review Comment:
   Remove extra line



##########
server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import com.datastax.driver.core.Session;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.dynamic.ClassFileLocator;
+import net.bytebuddy.dynamic.TypeResolutionStrategy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import net.bytebuddy.pool.TypePool;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.shared.Uninterruptibles;
+import org.apache.cassandra.sidecar.common.response.StreamStatsResponse;
+import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats;
+import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
+import org.apache.cassandra.sidecar.testing.CassandraSidecarTestContext;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests the stream stats endpoint with cassandra container.
+ */
+@ExtendWith(VertxExtension.class)
+public class StreamStatsIntegrationTest extends IntegrationTestBase
+{
+    @CassandraIntegrationTest(numDataDirsPerInstance = 4, nodesPerDc = 5, 
network = true, buildCluster = false)
+    void streamStatsTest(VertxTestContext context, 
ConfigurableCassandraTestContext cassandraTestContext) throws 
InterruptedException
+    {
+
+        BBHelperDecommissioningNode.reset();
+        UpgradeableCluster cluster = 
cassandraTestContext.configureAndStartCluster(
+        builder -> 
builder.withInstanceInitializer(BBHelperDecommissioningNode::install));
+        IUpgradeableInstance node = cluster.get(5);
+        IUpgradeableInstance seed = cluster.get(1);
+
+        createTestKeyspace();
+        createTestTableAndPopulate();
+
+        startAsync("Decommission node" + node.config().num(),
+                   () -> 
node.nodetoolResult("decommission").asserts().success());
+        AtomicBoolean hasStats = new AtomicBoolean(false);
+        AtomicBoolean dataReceived = new AtomicBoolean(false);
+
+        // Wait until nodes have reached expected state
+        awaitLatchOrThrow(BBHelperDecommissioningNode.transientStateStart, 2, 
TimeUnit.MINUTES, "transientStateStart");
+
+        ClusterUtils.awaitRingState(seed, node, "Leaving");
+        BBHelperDecommissioningNode.transientStateEnd.countDown();
+
+        for (int i = 0; i < 20; i++)
+        {
+            startAsync("Request-" + i , () -> {
+                try
+                {
+                    streamStats(context, hasStats, dataReceived);
+                }
+                catch (Exception e)
+                {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+        }
+
+        assertThat(hasStats).isTrue();
+        assertThat(dataReceived).isTrue();
+        context.completeNow();
+        context.awaitCompletion(2, TimeUnit.MINUTES);
+    }
+
+    private void streamStats(VertxTestContext context, AtomicBoolean hasStats, 
AtomicBoolean dataReceived) throws Exception
+    {
+        String testRoute = "/api/v1/cassandra/stats/stream";
+        testWithClient(context, client -> {
+            BBHelperDecommissioningNode.transientStateEnd.countDown();
+            client.get(server.actualPort(), "127.0.0.1", testRoute)
+                  .send(context.succeeding(response -> {
+                       assertRingResponseOK(response, sidecarTestContext, 
hasStats, dataReceived);
+                  }));
+        });
+    }
+
+    void assertRingResponseOK(HttpResponse<Buffer> response, 
CassandraSidecarTestContext cassandraTestContext,
+                              AtomicBoolean hasStats, AtomicBoolean 
dataReceived)
+    {
+        StreamStatsResponse streamStatsResponse = 
response.bodyAsJson(StreamStatsResponse.class);
+        assertThat(streamStatsResponse).isNotNull();
+        StreamProgressStats streamProgress = 
streamStatsResponse.streamProgressStats();
+        assertThat(streamProgress).isNotNull();
+        if (streamProgress.totalFilesToReceive() > 0)
+        {
+            hasStats.set(true);
+            if (streamProgress.totalFilesToReceive() == 
streamProgress.totalFilesReceived() &&

Review Comment:
   Do we need an else here?



##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/StreamProgressStats.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response.data;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * A class representing stats summarizing the progress of streamed bytes and 
files on the node
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class StreamProgressStats
+{
+    @JsonProperty("totalFilesToReceive")
+    public long totalFilesToReceive()
+    {
+        return totalFilesToReceive;
+    }
+
+    @JsonProperty("totalFilesReceived")
+    public long totalFilesReceived()
+    {
+        return totalFilesReceived;
+    }
+
+    @JsonProperty("totalBytesToReceive")
+    public long totalBytesToReceive()
+    {
+        return totalBytesToReceive;
+    }
+
+    @JsonProperty("totalBytesReceived")
+    public long totalBytesReceived()
+    {
+        return totalBytesReceived;
+    }
+
+    @JsonProperty("totalFilesToSend")
+    public long totalFilesToSend()
+    {
+        return totalFilesToSend;
+    }
+
+    @JsonProperty("totalFilesSent")
+    public long totalFilesSent()
+    {
+        return totalFilesSent;
+    }
+
+    @JsonProperty("totalBytesToSend")
+    public long totalBytesToSend()
+    {
+        return totalBytesToSend;
+    }
+
+    @JsonProperty("totalBytesSent")
+    public long totalBytesSent()
+    {
+        return totalBytesSent;
+    }
+
+    /* Across all sessions
+         * private final long sendProgress; from "sent to"

Review Comment:
   ```suggestion
            private final long sendProgress; from "sent to"
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java:
##########
@@ -372,6 +374,9 @@ public Router vertxRouter(Vertx vertx,
         router.get(ApiEndpointsV1.LIST_OPERATIONAL_JOBS_ROUTE)
               .handler(listOperationalJobsHandler);
 
+        router.get(ApiEndpointsV1.STREAM_STATS_ROUTE)

Review Comment:
   If (hopefully when :-P ) [this 
PR](https://github.com/apache/cassandra-sidecar/pull/164) gets merged, I would 
love to see the new API definition updated there as well. Let's work together 
to make that happen.



##########
client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java:
##########
@@ -1573,6 +1575,36 @@ public void onError(Throwable throwable)
         assertThat(new String(baos.toByteArray(), 
StandardCharsets.UTF_8)).isEqualTo("Test Content");
     }
 
+    @Test
+    public void testStreamsStats() throws Exception
+    {
+        String streamStatsResponseAsString = "{\"operationMode\":\"NORMAL\"," +
+                                             
"\"streamProgressStats\":{\"totalFilesToReceive\":7," +
+                                             
"\"totalFilesReceived\":7,\"totalBytesToReceive\":15088," +
+                                             
"\"totalBytesReceived\":15088,\"totalFilesToSend\":0,\"totalFilesSent\":0," +
+                                             
"\"totalBytesToSend\":0,\"totalBytesSent\":0}}";
+
+        MockResponse response = new 
MockResponse().setResponseCode(OK.code()).setBody(streamStatsResponseAsString);
+        enqueue(response);
+        StreamStatsResponse result = client.streamsStats().get();
+
+        assertThat(result).isNotNull();
+        assertThat(result.operationMode()).isNotNull().isEqualTo("NORMAL");
+        StreamProgressStats progressStats = result.streamProgressStats();
+        assertThat(progressStats).isNotNull();
+        assertThat(progressStats.totalBytesReceived()).isNotNull();
+        assertThat(progressStats.totalBytesSent()).isNotNull();
+        
assertThat(progressStats.totalBytesToReceive()).isNotNull().isEqualTo(progressStats.totalBytesReceived());
+        assertThat(progressStats.totalBytesToSend()).isNotNull();
+        assertThat(progressStats.totalFilesToReceive()).isNotNull();
+        assertThat(progressStats.totalFilesToSend()).isNotNull();
+        
assertThat(progressStats.totalFilesReceived()).isNotNull().isEqualTo(progressStats.totalFilesToReceive());
+        assertThat(progressStats.totalFilesSent()).isNotNull();
+

Review Comment:
   ```suggestion
   ```



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStreamManagerOperations.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.management.openmbean.CompositeData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.adapters.base.data.SessionInfo;
+import org.apache.cassandra.sidecar.adapters.base.data.StreamState;
+import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats;
+import org.apache.cassandra.sidecar.common.server.JmxClient;
+import org.apache.cassandra.sidecar.common.server.StreamManagerOperations;
+import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
+
+import static 
org.apache.cassandra.sidecar.adapters.base.StreamManagerJmxOperations.STREAM_MANAGER_OBJ_NAME;
+
+/**
+ * An implementation of the {@link StreamManagerOperations} that interfaces 
with Cassandra 4.0 and later
+ */
+public class CassandraStreamManagerOperations implements 
StreamManagerOperations
+{
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraStorageOperations.class);
+    protected final JmxClient jmxClient;
+
+    /**
+     * Creates a new instance with the provided {@link JmxClient} and {@link 
DnsResolver}
+     *
+     * @param jmxClient   the JMX client used to communicate with the 
Cassandra instance
+     * @param dnsResolver the DNS resolver used to lookup replicas
+     */
+    public CassandraStreamManagerOperations(JmxClient jmxClient, DnsResolver 
dnsResolver)
+    {
+        this.jmxClient = jmxClient;
+    }
+
+    public StreamProgressStats getStreamProgressStats()
+    {
+        Set<CompositeData> streamData = 
jmxClient.proxy(StreamManagerJmxOperations.class, STREAM_MANAGER_OBJ_NAME)
+                                                 .getCurrentStreams();
+
+        List<StreamState> streamStates = 
streamData.stream().map(StreamState::new).collect(Collectors.toList());
+        return computeStats(streamStates);
+    }
+
+    private StreamProgressStats computeStats(List<StreamState> streamStates)
+    {
+        List<SessionInfo> sessions = streamStates.stream().map(s -> 
s.sessions()).flatMap(Collection::stream).collect(Collectors.toList());

Review Comment:
   I understand this is by no means as hot path as the ones the [dev 
thread](https://lists.apache.org/thread/65glsjzkmpktzmns6j9wvr4nczvskx36) was 
referring to, but I wonder if we should keep that choice in sidecar of avoiding 
streams in non test code.
   
   Same for the other streams on the PR



##########
server/src/test/java/org/apache/cassandra/sidecar/routes/StreamStatsHandlerTest.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.response.StreamStatsResponse;
+import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.common.server.StreamManagerOperations;
+import org.apache.cassandra.sidecar.server.MainModule;
+import org.apache.cassandra.sidecar.server.Server;
+import org.mockito.stubbing.Answer;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link StreamStatsHandler}
+ */
+@ExtendWith(VertxExtension.class)
+public class StreamStatsHandlerTest
+{
+
+    static final Logger LOGGER = 
LoggerFactory.getLogger(StreamStatsHandlerTest.class);
+    Vertx vertx;
+    Server server;
+
+    @BeforeEach
+    void before() throws InterruptedException
+    {
+        Module testOverride = Modules.override(new TestModule())
+                                     .with(new StreamingStatsTestModule());
+        Injector injector = Guice.createInjector(Modules.override(new 
MainModule())
+                                                        .with(testOverride));
+        vertx = injector.getInstance(Vertx.class);
+        server = injector.getInstance(Server.class);
+        VertxTestContext context = new VertxTestContext();
+        server.start()
+              .onSuccess(s -> context.completeNow())
+              .onFailure(context::failNow);
+        context.awaitCompletion(5, TimeUnit.SECONDS);
+    }
+
+    @AfterEach
+    void after() throws InterruptedException
+    {
+        CountDownLatch closeLatch = new CountDownLatch(1);
+        server.close().onSuccess(res -> closeLatch.countDown());
+        if (closeLatch.await(60, TimeUnit.SECONDS))
+            LOGGER.info("Close event received before timeout.");
+        else
+            LOGGER.error("Close event timed out.");
+    }
+
+    @Test
+    void testStreamingStatsHandler(VertxTestContext context)
+    {
+        StreamingStatsTestModule.streamingStatsSupplier = () -> {
+            StreamStatsResponse response = new StreamStatsResponse("NORMAL",
+                                                                   new 
StreamProgressStats(7, 7, 1024, 1024, 0, 0, 0, 0));

Review Comment:
   Same comment with quick theories testing library.



##########
client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java:
##########
@@ -1573,6 +1575,36 @@ public void onError(Throwable throwable)
         assertThat(new String(baos.toByteArray(), 
StandardCharsets.UTF_8)).isEqualTo("Test Content");
     }
 
+    @Test
+    public void testStreamsStats() throws Exception
+    {
+        String streamStatsResponseAsString = "{\"operationMode\":\"NORMAL\"," +
+                                             
"\"streamProgressStats\":{\"totalFilesToReceive\":7," +
+                                             
"\"totalFilesReceived\":7,\"totalBytesToReceive\":15088," +

Review Comment:
   These numbers make me wonder if it would be worth to consider adding [Quick 
Theories](https://github.com/quicktheories/QuickTheories) testing library to 
sidecar. I could see the case of leaving that out of this PR, but it would 
definitely increase coverage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to