frankgh commented on code in PR #166:
URL: https://github.com/apache/cassandra-sidecar/pull/166#discussion_r1894420568


##########
server/src/test/java/org/apache/cassandra/sidecar/routes/StreamStatsHandlerTest.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.response.StreamStatsResponse;
+import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.common.server.StreamManagerOperations;
+import org.apache.cassandra.sidecar.server.MainModule;
+import org.apache.cassandra.sidecar.server.Server;
+import org.mockito.stubbing.Answer;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link StreamStatsHandler}
+ */
+@ExtendWith(VertxExtension.class)
+public class StreamStatsHandlerTest
+{
+
+    static final Logger LOGGER = 
LoggerFactory.getLogger(StreamStatsHandlerTest.class);
+    Vertx vertx;
+    Server server;
+
+    @BeforeEach
+    void before() throws InterruptedException
+    {
+        Module testOverride = Modules.override(new TestModule())
+                                     .with(new StreamingStatsTestModule());
+        Injector injector = Guice.createInjector(Modules.override(new 
MainModule())
+                                                        .with(testOverride));
+        vertx = injector.getInstance(Vertx.class);
+        server = injector.getInstance(Server.class);
+        VertxTestContext context = new VertxTestContext();
+        server.start()
+              .onSuccess(s -> context.completeNow())
+              .onFailure(context::failNow);
+        context.awaitCompletion(5, TimeUnit.SECONDS);
+    }
+
+    @AfterEach
+    void after() throws InterruptedException
+    {
+        CountDownLatch closeLatch = new CountDownLatch(1);
+        server.close().onSuccess(res -> closeLatch.countDown());
+        if (closeLatch.await(60, TimeUnit.SECONDS))
+            LOGGER.info("Close event received before timeout.");
+        else
+            LOGGER.error("Close event timed out.");
+    }
+
+    @Test
+    void testStreamingStatsHandler(VertxTestContext context)
+    {
+        StreamingStatsTestModule.streamingStatsSupplier = () -> {
+            StreamStatsResponse response = new StreamStatsResponse("NORMAL",
+                                                                   new 
StreamProgressStats(7, 7, 1024, 1024, 0, 0, 0, 0));
+            return response;

Review Comment:
   NIT 
   ```suggestion
               return new StreamStatsResponse("NORMAL",
                                                                      new 
StreamProgressStats(7, 7, 1024, 1024, 0, 0, 0, 0));
   ```



##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/StreamProgressStats.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response.data;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * A class representing stats summarizing the progress of streamed bytes and 
files on the node
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class StreamProgressStats
+{
+    @JsonProperty("totalFilesToReceive")
+    public long totalFilesToReceive()

Review Comment:
   class layout is wrong, can you please follow the code style guidelines.



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.data;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.stream.Collectors;
+import javax.management.openmbean.CompositeData;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Representation of session info data
+ */
+public class SessionInfo
+{
+    public final String peer;
+    public final int sessionIndex;
+    public final String connecting;
+    /** Immutable collection of receiving summaries */
+    public final Collection<StreamSummary> receivingSummaries;
+    /** Immutable collection of sending summaries*/
+    public final Collection<StreamSummary> sendingSummaries;
+    /** Current session state */
+    public final String state;
+
+    public final Collection<ProgressInfo> receivingFiles;
+    public final Collection<ProgressInfo> sendingFiles;
+
+    public SessionInfo(CompositeData data)
+    {
+        this.peer = (String) data.get("peer");
+        this.sessionIndex = (int) data.get("sessionIndex");
+        this.connecting = (String) data.get("connecting");
+        this.receivingSummaries = parseSummaries((CompositeData[]) 
data.get("receivingSummaries"));
+        this.sendingSummaries = parseSummaries((CompositeData[]) 
data.get("sendingSummaries"));
+        this.state = (String) data.get("state");
+        this.receivingFiles = parseFiles((CompositeData[]) 
data.get("receivingFiles"));
+        this.sendingFiles = parseFiles((CompositeData[]) 
data.get("sendingFiles"));
+    }
+
+    private Collection<StreamSummary> parseSummaries(CompositeData[] summaries)
+    {
+        return 
Arrays.stream(summaries).map(StreamSummary::new).collect(Collectors.toList());
+    }
+
+    private Collection<ProgressInfo> parseFiles(CompositeData[] files)
+    {
+        return 
Arrays.stream(files).map(ProgressInfo::new).collect(Collectors.toList());
+    }
+
+    private long getTotalFiles(Collection<StreamSummary> summaries)
+    {
+        long total = 0;
+        for (StreamSummary summary : summaries)
+            total += summary.files;
+        return total;
+    }
+
+    /**
+     * @return total size(in bytes) already received.
+     */
+    public long getTotalSizeReceived()
+    {
+        return getTotalSizeInProgress(receivingFiles);
+    }
+
+    /**
+     * @return total size(in bytes) already sent.
+     */
+    public long getTotalSizeSent()
+    {
+        return getTotalSizeInProgress(sendingFiles);
+    }
+
+    /**
+     * @return total number of files to receive in the session
+     */
+    public long getTotalFilesToReceive()
+    {
+        return getTotalFiles(receivingSummaries);
+    }
+
+    /**
+     * @return total number of files to send in the session
+     */
+    public long getTotalFilesToSend()
+    {
+        return getTotalFiles(sendingSummaries);
+    }
+
+    private long getTotalSizes(Collection<StreamSummary> summaries)
+    {
+        long total = 0;
+        for (StreamSummary summary : summaries)
+            total += summary.totalSize;
+        return total;
+    }
+
+    /**
+     * @return total size(in bytes) to receive in the session
+     */
+    public long getTotalSizeToReceive()
+    {
+        return getTotalSizes(receivingSummaries);
+    }
+
+    private long getTotalSizeInProgress(Collection<ProgressInfo> streams)
+    {
+        long total = 0;
+        for (ProgressInfo stream : streams)
+            total += stream.currentBytes;
+        return total;
+    }
+
+    /**
+     * @return total size(in bytes) to send in the session
+     */
+    public long getTotalSizeToSend()
+    {
+        return getTotalSizes(sendingSummaries);
+    }
+
+    private long getTotalFilesCompleted(Collection<ProgressInfo> files)
+    {
+        Iterable<ProgressInfo> completed = Iterables.filter(files, input -> 
input.isCompleted());
+        return Iterables.size(completed);
+    }
+
+    /**
+     * @return total number of files already received.
+     */
+    public long getTotalFilesReceived()
+    {
+        return getTotalFilesCompleted(receivingFiles);
+    }
+
+    /**
+     * @return total number of files already sent.
+     */
+    public long getTotalFilesSent()
+    {
+        return getTotalFilesCompleted(sendingFiles);
+    }
+
+

Review Comment:
   ```suggestion
   ```



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java:
##########
@@ -176,6 +177,15 @@ public MetricsOperations metricsOperations()
         return new CassandraMetricsOperations(cqlSessionProvider);
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public StreamManagerOperations streamManagerOperations()

Review Comment:
   I think this and `MetricsOperations` are essentially  the same. We should 
fold stats into a single interface. Conceptually abstracts access to 
functionality behind Cassandra. The abstraction here is access to statistics 
from Cassandra. I think `MetricsOperations` does not really capture what we are 
trying to abstract, and adding another interface here doesn't make sense.
   
   I would avoid trying to have a one-to-one mapping from interface to C* bean 
here because in C*'s code base the functionality was exposed through a small 
set of beans. In recent contributions to the C* codebase I've seen that we are 
making a better effort to separate bean access into something that makes sense 
logically. For that reason we should really think about some abstraction that 
makes sense from a point of view of what we are trying to access, and maybe not 
focus too much on how things are exposed in Cassandra 



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/StreamSummary.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.data;
+
+import javax.management.openmbean.CompositeData;
+
+/**
+ * Representation of the stream summary data
+ */
+public class StreamSummary
+{
+    public final String tableId;
+
+    /**
+     * Number of files to transfer. Can be 0 if nothing to transfer for some 
streaming request.
+     */
+    public final int files;
+    public final long totalSize;
+

Review Comment:
   ```suggestion
   ```



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/StreamState.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.data;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.stream.Collectors;
+import javax.management.openmbean.CompositeData;
+
+/**
+ * Representation of the stream state data
+ */
+public class StreamState
+{
+    Collection<SessionInfo> sessions;
+
+    public StreamState(CompositeData data)
+    {
+        this.sessions = parseSessions((CompositeData[]) data.get("sessions"));
+    }
+
+    private Collection<SessionInfo> parseSessions(CompositeData[] sessions)
+    {
+        return 
Arrays.stream(sessions).map(SessionInfo::new).collect(Collectors.toList());

Review Comment:
   also agree with Bernardo that streams should be used carefully. This is less 
readable to me than a good old for loop.



##########
client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java:
##########
@@ -1573,6 +1575,36 @@ public void onError(Throwable throwable)
         assertThat(new String(baos.toByteArray(), 
StandardCharsets.UTF_8)).isEqualTo("Test Content");
     }
 
+    @Test
+    public void testStreamsStats() throws Exception
+    {
+        String streamStatsResponseAsString = "{\"operationMode\":\"NORMAL\"," +
+                                             
"\"streamProgressStats\":{\"totalFilesToReceive\":7," +
+                                             
"\"totalFilesReceived\":7,\"totalBytesToReceive\":15088," +
+                                             
"\"totalBytesReceived\":15088,\"totalFilesToSend\":0,\"totalFilesSent\":0," +
+                                             
"\"totalBytesToSend\":0,\"totalBytesSent\":0}}";
+
+        MockResponse response = new 
MockResponse().setResponseCode(OK.code()).setBody(streamStatsResponseAsString);
+        enqueue(response);
+        StreamStatsResponse result = client.streamsStats().get();
+
+        assertThat(result).isNotNull();
+        assertThat(result.operationMode()).isNotNull().isEqualTo("NORMAL");
+        StreamProgressStats progressStats = result.streamProgressStats();
+        assertThat(progressStats).isNotNull();
+        assertThat(progressStats.totalBytesReceived()).isNotNull();

Review Comment:
   no validation of the actual values?



##########
client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java:
##########
@@ -1573,6 +1575,36 @@ public void onError(Throwable throwable)
         assertThat(new String(baos.toByteArray(), 
StandardCharsets.UTF_8)).isEqualTo("Test Content");
     }
 
+    @Test
+    public void testStreamsStats() throws Exception
+    {
+        String streamStatsResponseAsString = "{\"operationMode\":\"NORMAL\"," +
+                                             
"\"streamProgressStats\":{\"totalFilesToReceive\":7," +
+                                             
"\"totalFilesReceived\":7,\"totalBytesToReceive\":15088," +
+                                             
"\"totalBytesReceived\":15088,\"totalFilesToSend\":0,\"totalFilesSent\":0," +
+                                             
"\"totalBytesToSend\":0,\"totalBytesSent\":0}}";

Review Comment:
   maybe let's try with non-zero values, since 0 is the default



##########
server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java:
##########
@@ -102,4 +102,6 @@ default void outOfRangeDataCleanup(@NotNull String 
keyspace, @NotNull String tab
     {
         outOfRangeDataCleanup(keyspace, table, 1);
     }
+
+    String getOperationMode();

Review Comment:
   should be `operationMode` and there are no javadocs here
   ```suggestion
       String operationMode();
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/routes/StreamStatsHandler.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import com.google.inject.Inject;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.common.response.StreamStatsResponse;
+import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.common.server.StreamManagerOperations;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable;
+
+/**
+ * Handler for retrieving node streams stats
+ */
+public class StreamStatsHandler extends AbstractHandler<Void>
+{
+
+    /**
+     * Constructs a handler with the provided {@code metadataFetcher}
+     *
+     * @param metadataFetcher the metadata fetcher
+     * @param executorPools   executor pools for blocking executions
+     */
+    @Inject
+    protected StreamStatsHandler(InstanceMetadataFetcher metadataFetcher,
+                                 ExecutorPools executorPools)
+    {
+        super(metadataFetcher, executorPools, null);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void handleInternal(RoutingContext context,
+                               HttpServerRequest httpRequest,
+                               String host,
+                               SocketAddress remoteAddress,
+                               Void request)
+    {
+
+        CassandraAdapterDelegate delegate = metadataFetcher.delegate(host);
+        if (delegate == null)
+        {
+            context.fail(cassandraServiceUnavailable());
+            return;
+        }
+
+        StorageOperations storageOperations = delegate.storageOperations();
+        StreamManagerOperations streamMgrOperations = 
delegate.streamManagerOperations();
+        if (storageOperations == null || streamMgrOperations == null)
+        {
+            context.fail(cassandraServiceUnavailable());
+            return;
+        }

Review Comment:
   this changed in https://issues.apache.org/jira/browse/CASSSIDECAR-182, and 
should be simplified to not do any null checks.



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/ProgressInfo.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.data;
+
+import javax.management.openmbean.CompositeData;
+
+/**
+ * Representation of the stream progress info
+ */
+public class ProgressInfo
+{
+

Review Comment:
   ```suggestion
   ```



##########
server/src/test/java/org/apache/cassandra/sidecar/routes/StreamStatsHandlerTest.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.response.StreamStatsResponse;
+import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.common.server.StreamManagerOperations;
+import org.apache.cassandra.sidecar.server.MainModule;
+import org.apache.cassandra.sidecar.server.Server;
+import org.mockito.stubbing.Answer;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link StreamStatsHandler}
+ */
+@ExtendWith(VertxExtension.class)
+public class StreamStatsHandlerTest
+{
+
+    static final Logger LOGGER = 
LoggerFactory.getLogger(StreamStatsHandlerTest.class);
+    Vertx vertx;
+    Server server;
+
+    @BeforeEach
+    void before() throws InterruptedException
+    {
+        Module testOverride = Modules.override(new TestModule())
+                                     .with(new StreamingStatsTestModule());
+        Injector injector = Guice.createInjector(Modules.override(new 
MainModule())
+                                                        .with(testOverride));
+        vertx = injector.getInstance(Vertx.class);
+        server = injector.getInstance(Server.class);
+        VertxTestContext context = new VertxTestContext();
+        server.start()
+              .onSuccess(s -> context.completeNow())
+              .onFailure(context::failNow);
+        context.awaitCompletion(5, TimeUnit.SECONDS);
+    }
+
+    @AfterEach
+    void after() throws InterruptedException
+    {
+        CountDownLatch closeLatch = new CountDownLatch(1);
+        server.close().onSuccess(res -> closeLatch.countDown());
+        if (closeLatch.await(60, TimeUnit.SECONDS))
+            LOGGER.info("Close event received before timeout.");
+        else
+            LOGGER.error("Close event timed out.");
+    }
+
+    @Test
+    void testStreamingStatsHandler(VertxTestContext context)
+    {
+        StreamingStatsTestModule.streamingStatsSupplier = () -> {
+            StreamStatsResponse response = new StreamStatsResponse("NORMAL",
+                                                                   new 
StreamProgressStats(7, 7, 1024, 1024, 0, 0, 0, 0));
+            return response;
+        };
+
+
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/api/v1/cassandra/stats/stream";
+        client.get(server.actualPort(), "127.0.0.1", testRoute)
+              .expect(ResponsePredicate.SC_OK)
+              .send(context.succeeding(response -> {
+                  assertThat(response.statusCode()).isEqualTo(OK.code());
+                  StreamStatsResponse statsResponse = 
response.bodyAsJson(StreamStatsResponse.class);
+                  assertThat(statsResponse).isNotNull();
+                  
assertThat(statsResponse.operationMode()).isEqualTo("NORMAL");
+                  context.completeNow();
+              }));
+    }
+
+    @Test
+    void testStreamingStatsHandlerFailure(VertxTestContext context)
+    {
+        StreamingStatsTestModule.streamingStatsSupplier = () -> {
+            StreamStatsResponse response = new StreamStatsResponse("NORMAL",
+                                                                   new 
StreamProgressStats(7, 7, 1024, 1024, 0, 0, 0, 0));
+            return response;
+        };
+
+
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/api/v1/cassandra/stats/stream";
+        client.get(server.actualPort(), "127.0.0.1", testRoute)
+              .expect(ResponsePredicate.SC_OK)
+              .send(context.succeeding(response -> {
+                  assertThat(response.statusCode()).isEqualTo(OK.code());
+                  StreamStatsResponse statsResponse = 
response.bodyAsJson(StreamStatsResponse.class);
+                  assertThat(statsResponse).isNotNull();
+                  
assertThat(statsResponse.operationMode()).isEqualTo("NORMAL");
+                  context.completeNow();
+              }));
+    }
+
+
+    static class StreamingStatsTestModule extends AbstractModule
+    {
+        static Supplier<StreamStatsResponse> streamingStatsSupplier;

Review Comment:
   hmm, why not directly inject this in the constructor?



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java:
##########
@@ -146,6 +146,11 @@ public int forceKeyspaceCleanup(int jobs, String 
keyspaceName, String... tables)
         return delegate.forceKeyspaceCleanup(jobs, keyspaceName, tables);
     }
 
+    public String getOperationMode()

Review Comment:
   I see a few `@Override` annotations. Can you please add them where they are 
missing in your PR?



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/ProgressInfo.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.data;
+
+import javax.management.openmbean.CompositeData;
+
+/**
+ * Representation of the stream progress info
+ */
+public class ProgressInfo
+{
+
+    public final String peer;
+    public final int sessionIndex;
+    public final String fileName;
+    public final String direction;
+    public final long currentBytes;
+    public final long totalBytes;
+
+    public ProgressInfo(CompositeData data)
+    {
+
+        this.peer = (String) data.get("peer");
+        this.sessionIndex = (int) data.get("sessionIndex");
+        this.fileName = (String) data.get("fileName");
+        this.direction = (String) data.get("direction");
+        this.currentBytes = (long) data.get("currentBytes");
+        this.totalBytes = (long) data.get("totalBytes");
+    }
+
+    /**
+     * @return true if transfer is completed
+     */
+    public boolean isCompleted()
+    {
+        return currentBytes >= totalBytes;
+    }
+

Review Comment:
   ```suggestion
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java:
##########
@@ -410,6 +411,10 @@ public MetricsOperations metricsOperations()
         return fromAdapter(ICassandraAdapter::metricsOperations);
     }
 
+    public StreamManagerOperations streamManagerOperations()

Review Comment:
   should go away



##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/response/StreamStatsResponse.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats;
+
+/**
+ * Class response for the StreamStats API
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class StreamStatsResponse

Review Comment:
   I think we need to understand why the operation mode is required for this 
endpoint. maybe elaborate somewhere why we want the operation mode as part of 
this response.



##########
server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StreamManagerOperations.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.server;
+
+import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats;
+
+/**
+ * An interface that defines interactions with the stream manager system in 
Cassandra.
+ */
+public interface StreamManagerOperations

Review Comment:
   we should combine this into a `Statistics` interface that combines both the 
`MetricsOperations` and `StreamManagerOperations` interfaces



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.data;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.stream.Collectors;
+import javax.management.openmbean.CompositeData;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Representation of session info data
+ */
+public class SessionInfo
+{
+    public final String peer;
+    public final int sessionIndex;
+    public final String connecting;
+    /** Immutable collection of receiving summaries */
+    public final Collection<StreamSummary> receivingSummaries;
+    /** Immutable collection of sending summaries*/
+    public final Collection<StreamSummary> sendingSummaries;
+    /** Current session state */
+    public final String state;
+
+    public final Collection<ProgressInfo> receivingFiles;
+    public final Collection<ProgressInfo> sendingFiles;
+
+    public SessionInfo(CompositeData data)
+    {
+        this.peer = (String) data.get("peer");
+        this.sessionIndex = (int) data.get("sessionIndex");
+        this.connecting = (String) data.get("connecting");
+        this.receivingSummaries = parseSummaries((CompositeData[]) 
data.get("receivingSummaries"));
+        this.sendingSummaries = parseSummaries((CompositeData[]) 
data.get("sendingSummaries"));
+        this.state = (String) data.get("state");
+        this.receivingFiles = parseFiles((CompositeData[]) 
data.get("receivingFiles"));
+        this.sendingFiles = parseFiles((CompositeData[]) 
data.get("sendingFiles"));
+    }
+
+    private Collection<StreamSummary> parseSummaries(CompositeData[] summaries)

Review Comment:
   these helper methods should go at the end.



##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/response/StreamStatsResponse.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats;
+
+/**
+ * Class response for the StreamStats API
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class StreamStatsResponse
+{
+   @JsonProperty("operationMode")

Review Comment:
   there are some formatting issues in this class. Also methods should go below 
the constructor. Please refer to the Code Structure -> Class Layout section in 
the Cassandra [code 
style](https://cassandra.apache.org/_/development/code_style.html)



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.data;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.stream.Collectors;
+import javax.management.openmbean.CompositeData;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Representation of session info data
+ */
+public class SessionInfo
+{
+    public final String peer;
+    public final int sessionIndex;
+    public final String connecting;
+    /** Immutable collection of receiving summaries */
+    public final Collection<StreamSummary> receivingSummaries;
+    /** Immutable collection of sending summaries*/
+    public final Collection<StreamSummary> sendingSummaries;
+    /** Current session state */
+    public final String state;
+
+    public final Collection<ProgressInfo> receivingFiles;
+    public final Collection<ProgressInfo> sendingFiles;
+
+    public SessionInfo(CompositeData data)
+    {
+        this.peer = (String) data.get("peer");
+        this.sessionIndex = (int) data.get("sessionIndex");
+        this.connecting = (String) data.get("connecting");
+        this.receivingSummaries = parseSummaries((CompositeData[]) 
data.get("receivingSummaries"));
+        this.sendingSummaries = parseSummaries((CompositeData[]) 
data.get("sendingSummaries"));
+        this.state = (String) data.get("state");
+        this.receivingFiles = parseFiles((CompositeData[]) 
data.get("receivingFiles"));
+        this.sendingFiles = parseFiles((CompositeData[]) 
data.get("sendingFiles"));
+    }
+
+    private Collection<StreamSummary> parseSummaries(CompositeData[] summaries)
+    {
+        return 
Arrays.stream(summaries).map(StreamSummary::new).collect(Collectors.toList());
+    }
+
+    private Collection<ProgressInfo> parseFiles(CompositeData[] files)
+    {
+        return 
Arrays.stream(files).map(ProgressInfo::new).collect(Collectors.toList());
+    }
+
+    private long getTotalFiles(Collection<StreamSummary> summaries)
+    {
+        long total = 0;
+        for (StreamSummary summary : summaries)
+            total += summary.files;
+        return total;
+    }
+
+    /**
+     * @return total size(in bytes) already received.
+     */
+    public long getTotalSizeReceived()

Review Comment:
   ```suggestion
       public long totalSizeReceived()
   ```



##########
server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import com.datastax.driver.core.Session;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.dynamic.ClassFileLocator;
+import net.bytebuddy.dynamic.TypeResolutionStrategy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import net.bytebuddy.pool.TypePool;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.shared.Uninterruptibles;
+import org.apache.cassandra.sidecar.common.response.StreamStatsResponse;
+import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats;
+import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
+import org.apache.cassandra.sidecar.testing.CassandraSidecarTestContext;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests the stream stats endpoint with cassandra container.
+ */
+@ExtendWith(VertxExtension.class)
+public class StreamStatsIntegrationTest extends IntegrationTestBase
+{
+    @CassandraIntegrationTest(numDataDirsPerInstance = 4, nodesPerDc = 5, 
network = true, buildCluster = false)
+    void streamStatsTest(VertxTestContext context, 
ConfigurableCassandraTestContext cassandraTestContext) throws 
InterruptedException
+    {
+
+        BBHelperDecommissioningNode.reset();
+        UpgradeableCluster cluster = 
cassandraTestContext.configureAndStartCluster(
+        builder -> 
builder.withInstanceInitializer(BBHelperDecommissioningNode::install));
+        IUpgradeableInstance node = cluster.get(5);
+        IUpgradeableInstance seed = cluster.get(1);
+
+        createTestKeyspace();
+        createTestTableAndPopulate();
+
+        startAsync("Decommission node" + node.config().num(),
+                   () -> 
node.nodetoolResult("decommission").asserts().success());
+        AtomicBoolean hasStats = new AtomicBoolean(false);
+        AtomicBoolean dataReceived = new AtomicBoolean(false);
+
+        // Wait until nodes have reached expected state
+        awaitLatchOrThrow(BBHelperDecommissioningNode.transientStateStart, 2, 
TimeUnit.MINUTES, "transientStateStart");
+
+        ClusterUtils.awaitRingState(seed, node, "Leaving");
+        BBHelperDecommissioningNode.transientStateEnd.countDown();
+
+        for (int i = 0; i < 20; i++)

Review Comment:
   why do it 20 times?



##########
server/src/main/java/org/apache/cassandra/sidecar/routes/AbstractHandler.java:
##########
@@ -121,6 +122,19 @@ protected void ifMetricsOpsAvailable(RoutingContext 
context,
          ifAvailable.accept(operations);
     }
 
+    protected void ifStreamMgrOpsAvailable(RoutingContext context,
+                                           String host,
+                                           CassandraAdapterDelegate delegate,
+                                           Consumer<StreamManagerOperations> 
ifAvailable)
+    {
+        StreamManagerOperations operations = 
delegate.streamManagerOperations();
+        if (operations == null)
+        {
+            context.fail(cassandraServiceUnavailable());
+            return;
+        }
+        ifAvailable.accept(operations);
+    }

Review Comment:
   this is no longer necessary after 
https://issues.apache.org/jira/browse/CASSSIDECAR-182, please remove



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.data;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.stream.Collectors;
+import javax.management.openmbean.CompositeData;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Representation of session info data
+ */
+public class SessionInfo
+{
+    public final String peer;
+    public final int sessionIndex;
+    public final String connecting;
+    /** Immutable collection of receiving summaries */
+    public final Collection<StreamSummary> receivingSummaries;
+    /** Immutable collection of sending summaries*/
+    public final Collection<StreamSummary> sendingSummaries;
+    /** Current session state */
+    public final String state;
+
+    public final Collection<ProgressInfo> receivingFiles;
+    public final Collection<ProgressInfo> sendingFiles;
+
+    public SessionInfo(CompositeData data)
+    {
+        this.peer = (String) data.get("peer");
+        this.sessionIndex = (int) data.get("sessionIndex");
+        this.connecting = (String) data.get("connecting");
+        this.receivingSummaries = parseSummaries((CompositeData[]) 
data.get("receivingSummaries"));
+        this.sendingSummaries = parseSummaries((CompositeData[]) 
data.get("sendingSummaries"));
+        this.state = (String) data.get("state");
+        this.receivingFiles = parseFiles((CompositeData[]) 
data.get("receivingFiles"));
+        this.sendingFiles = parseFiles((CompositeData[]) 
data.get("sendingFiles"));
+    }
+
+    private Collection<StreamSummary> parseSummaries(CompositeData[] summaries)
+    {
+        return 
Arrays.stream(summaries).map(StreamSummary::new).collect(Collectors.toList());
+    }
+
+    private Collection<ProgressInfo> parseFiles(CompositeData[] files)
+    {
+        return 
Arrays.stream(files).map(ProgressInfo::new).collect(Collectors.toList());
+    }
+
+    private long getTotalFiles(Collection<StreamSummary> summaries)
+    {
+        long total = 0;
+        for (StreamSummary summary : summaries)
+            total += summary.files;
+        return total;
+    }
+
+    /**
+     * @return total size(in bytes) already received.
+     */
+    public long getTotalSizeReceived()
+    {
+        return getTotalSizeInProgress(receivingFiles);
+    }
+
+    /**
+     * @return total size(in bytes) already sent.
+     */
+    public long getTotalSizeSent()

Review Comment:
   ```suggestion
       public long totalSizeSent()
   ```



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.data;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.stream.Collectors;
+import javax.management.openmbean.CompositeData;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Representation of session info data
+ */
+public class SessionInfo
+{
+    public final String peer;
+    public final int sessionIndex;
+    public final String connecting;
+    /** Immutable collection of receiving summaries */
+    public final Collection<StreamSummary> receivingSummaries;
+    /** Immutable collection of sending summaries*/
+    public final Collection<StreamSummary> sendingSummaries;
+    /** Current session state */
+    public final String state;
+
+    public final Collection<ProgressInfo> receivingFiles;
+    public final Collection<ProgressInfo> sendingFiles;
+
+    public SessionInfo(CompositeData data)
+    {
+        this.peer = (String) data.get("peer");
+        this.sessionIndex = (int) data.get("sessionIndex");
+        this.connecting = (String) data.get("connecting");
+        this.receivingSummaries = parseSummaries((CompositeData[]) 
data.get("receivingSummaries"));
+        this.sendingSummaries = parseSummaries((CompositeData[]) 
data.get("sendingSummaries"));
+        this.state = (String) data.get("state");
+        this.receivingFiles = parseFiles((CompositeData[]) 
data.get("receivingFiles"));
+        this.sendingFiles = parseFiles((CompositeData[]) 
data.get("sendingFiles"));
+    }
+
+    private Collection<StreamSummary> parseSummaries(CompositeData[] summaries)
+    {
+        return 
Arrays.stream(summaries).map(StreamSummary::new).collect(Collectors.toList());
+    }
+
+    private Collection<ProgressInfo> parseFiles(CompositeData[] files)
+    {
+        return 
Arrays.stream(files).map(ProgressInfo::new).collect(Collectors.toList());
+    }
+
+    private long getTotalFiles(Collection<StreamSummary> summaries)
+    {
+        long total = 0;
+        for (StreamSummary summary : summaries)
+            total += summary.files;
+        return total;
+    }
+
+    /**
+     * @return total size(in bytes) already received.
+     */
+    public long getTotalSizeReceived()
+    {
+        return getTotalSizeInProgress(receivingFiles);
+    }
+
+    /**
+     * @return total size(in bytes) already sent.
+     */
+    public long getTotalSizeSent()
+    {
+        return getTotalSizeInProgress(sendingFiles);
+    }
+
+    /**
+     * @return total number of files to receive in the session
+     */
+    public long getTotalFilesToReceive()
+    {
+        return getTotalFiles(receivingSummaries);
+    }
+
+    /**
+     * @return total number of files to send in the session
+     */
+    public long getTotalFilesToSend()
+    {
+        return getTotalFiles(sendingSummaries);
+    }
+
+    private long getTotalSizes(Collection<StreamSummary> summaries)
+    {
+        long total = 0;
+        for (StreamSummary summary : summaries)
+            total += summary.totalSize;
+        return total;
+    }
+
+    /**
+     * @return total size(in bytes) to receive in the session
+     */
+    public long getTotalSizeToReceive()

Review Comment:
   ```suggestion
       public long totalSizeToReceive()
   ```



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.data;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.stream.Collectors;
+import javax.management.openmbean.CompositeData;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Representation of session info data
+ */
+public class SessionInfo
+{
+    public final String peer;
+    public final int sessionIndex;
+    public final String connecting;
+    /** Immutable collection of receiving summaries */
+    public final Collection<StreamSummary> receivingSummaries;
+    /** Immutable collection of sending summaries*/
+    public final Collection<StreamSummary> sendingSummaries;
+    /** Current session state */
+    public final String state;
+
+    public final Collection<ProgressInfo> receivingFiles;
+    public final Collection<ProgressInfo> sendingFiles;
+
+    public SessionInfo(CompositeData data)
+    {
+        this.peer = (String) data.get("peer");
+        this.sessionIndex = (int) data.get("sessionIndex");
+        this.connecting = (String) data.get("connecting");
+        this.receivingSummaries = parseSummaries((CompositeData[]) 
data.get("receivingSummaries"));
+        this.sendingSummaries = parseSummaries((CompositeData[]) 
data.get("sendingSummaries"));
+        this.state = (String) data.get("state");
+        this.receivingFiles = parseFiles((CompositeData[]) 
data.get("receivingFiles"));
+        this.sendingFiles = parseFiles((CompositeData[]) 
data.get("sendingFiles"));
+    }
+
+    private Collection<StreamSummary> parseSummaries(CompositeData[] summaries)
+    {
+        return 
Arrays.stream(summaries).map(StreamSummary::new).collect(Collectors.toList());
+    }
+
+    private Collection<ProgressInfo> parseFiles(CompositeData[] files)
+    {
+        return 
Arrays.stream(files).map(ProgressInfo::new).collect(Collectors.toList());
+    }
+
+    private long getTotalFiles(Collection<StreamSummary> summaries)
+    {
+        long total = 0;
+        for (StreamSummary summary : summaries)
+            total += summary.files;
+        return total;
+    }
+
+    /**
+     * @return total size(in bytes) already received.
+     */
+    public long getTotalSizeReceived()
+    {
+        return getTotalSizeInProgress(receivingFiles);
+    }
+
+    /**
+     * @return total size(in bytes) already sent.
+     */
+    public long getTotalSizeSent()
+    {
+        return getTotalSizeInProgress(sendingFiles);
+    }
+
+    /**
+     * @return total number of files to receive in the session
+     */
+    public long getTotalFilesToReceive()
+    {
+        return getTotalFiles(receivingSummaries);
+    }
+
+    /**
+     * @return total number of files to send in the session
+     */
+    public long getTotalFilesToSend()
+    {
+        return getTotalFiles(sendingSummaries);
+    }
+
+    private long getTotalSizes(Collection<StreamSummary> summaries)
+    {
+        long total = 0;
+        for (StreamSummary summary : summaries)
+            total += summary.totalSize;
+        return total;
+    }
+
+    /**
+     * @return total size(in bytes) to receive in the session
+     */
+    public long getTotalSizeToReceive()
+    {
+        return getTotalSizes(receivingSummaries);
+    }
+
+    private long getTotalSizeInProgress(Collection<ProgressInfo> streams)
+    {
+        long total = 0;
+        for (ProgressInfo stream : streams)
+            total += stream.currentBytes;
+        return total;
+    }
+
+    /**
+     * @return total size(in bytes) to send in the session
+     */
+    public long getTotalSizeToSend()

Review Comment:
   ```suggestion
       public long totalSizeToSend()
   ```



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.data;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.stream.Collectors;
+import javax.management.openmbean.CompositeData;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Representation of session info data
+ */
+public class SessionInfo
+{
+    public final String peer;
+    public final int sessionIndex;
+    public final String connecting;
+    /** Immutable collection of receiving summaries */
+    public final Collection<StreamSummary> receivingSummaries;
+    /** Immutable collection of sending summaries*/
+    public final Collection<StreamSummary> sendingSummaries;
+    /** Current session state */
+    public final String state;
+
+    public final Collection<ProgressInfo> receivingFiles;
+    public final Collection<ProgressInfo> sendingFiles;
+
+    public SessionInfo(CompositeData data)
+    {
+        this.peer = (String) data.get("peer");
+        this.sessionIndex = (int) data.get("sessionIndex");
+        this.connecting = (String) data.get("connecting");
+        this.receivingSummaries = parseSummaries((CompositeData[]) 
data.get("receivingSummaries"));
+        this.sendingSummaries = parseSummaries((CompositeData[]) 
data.get("sendingSummaries"));
+        this.state = (String) data.get("state");
+        this.receivingFiles = parseFiles((CompositeData[]) 
data.get("receivingFiles"));
+        this.sendingFiles = parseFiles((CompositeData[]) 
data.get("sendingFiles"));
+    }
+
+    private Collection<StreamSummary> parseSummaries(CompositeData[] summaries)
+    {
+        return 
Arrays.stream(summaries).map(StreamSummary::new).collect(Collectors.toList());
+    }
+
+    private Collection<ProgressInfo> parseFiles(CompositeData[] files)
+    {
+        return 
Arrays.stream(files).map(ProgressInfo::new).collect(Collectors.toList());
+    }
+
+    private long getTotalFiles(Collection<StreamSummary> summaries)
+    {
+        long total = 0;
+        for (StreamSummary summary : summaries)
+            total += summary.files;
+        return total;
+    }
+
+    /**
+     * @return total size(in bytes) already received.
+     */
+    public long getTotalSizeReceived()
+    {
+        return getTotalSizeInProgress(receivingFiles);
+    }
+
+    /**
+     * @return total size(in bytes) already sent.
+     */
+    public long getTotalSizeSent()
+    {
+        return getTotalSizeInProgress(sendingFiles);
+    }
+
+    /**
+     * @return total number of files to receive in the session
+     */
+    public long getTotalFilesToReceive()

Review Comment:
   ```suggestion
       public long totalFilesToReceive()
   ```



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.data;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.stream.Collectors;
+import javax.management.openmbean.CompositeData;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Representation of session info data
+ */
+public class SessionInfo
+{
+    public final String peer;
+    public final int sessionIndex;
+    public final String connecting;
+    /** Immutable collection of receiving summaries */
+    public final Collection<StreamSummary> receivingSummaries;
+    /** Immutable collection of sending summaries*/
+    public final Collection<StreamSummary> sendingSummaries;
+    /** Current session state */
+    public final String state;
+
+    public final Collection<ProgressInfo> receivingFiles;
+    public final Collection<ProgressInfo> sendingFiles;
+
+    public SessionInfo(CompositeData data)
+    {
+        this.peer = (String) data.get("peer");
+        this.sessionIndex = (int) data.get("sessionIndex");
+        this.connecting = (String) data.get("connecting");
+        this.receivingSummaries = parseSummaries((CompositeData[]) 
data.get("receivingSummaries"));
+        this.sendingSummaries = parseSummaries((CompositeData[]) 
data.get("sendingSummaries"));
+        this.state = (String) data.get("state");
+        this.receivingFiles = parseFiles((CompositeData[]) 
data.get("receivingFiles"));
+        this.sendingFiles = parseFiles((CompositeData[]) 
data.get("sendingFiles"));
+    }
+
+    private Collection<StreamSummary> parseSummaries(CompositeData[] summaries)
+    {
+        return 
Arrays.stream(summaries).map(StreamSummary::new).collect(Collectors.toList());
+    }
+
+    private Collection<ProgressInfo> parseFiles(CompositeData[] files)
+    {
+        return 
Arrays.stream(files).map(ProgressInfo::new).collect(Collectors.toList());
+    }
+
+    private long getTotalFiles(Collection<StreamSummary> summaries)
+    {
+        long total = 0;
+        for (StreamSummary summary : summaries)
+            total += summary.files;
+        return total;
+    }
+
+    /**
+     * @return total size(in bytes) already received.
+     */
+    public long getTotalSizeReceived()
+    {
+        return getTotalSizeInProgress(receivingFiles);
+    }
+
+    /**
+     * @return total size(in bytes) already sent.
+     */
+    public long getTotalSizeSent()
+    {
+        return getTotalSizeInProgress(sendingFiles);
+    }
+
+    /**
+     * @return total number of files to receive in the session
+     */
+    public long getTotalFilesToReceive()
+    {
+        return getTotalFiles(receivingSummaries);
+    }
+
+    /**
+     * @return total number of files to send in the session
+     */
+    public long getTotalFilesToSend()
+    {
+        return getTotalFiles(sendingSummaries);
+    }
+
+    private long getTotalSizes(Collection<StreamSummary> summaries)
+    {
+        long total = 0;
+        for (StreamSummary summary : summaries)
+            total += summary.totalSize;
+        return total;
+    }
+
+    /**
+     * @return total size(in bytes) to receive in the session
+     */
+    public long getTotalSizeToReceive()
+    {
+        return getTotalSizes(receivingSummaries);
+    }
+
+    private long getTotalSizeInProgress(Collection<ProgressInfo> streams)
+    {
+        long total = 0;
+        for (ProgressInfo stream : streams)
+            total += stream.currentBytes;
+        return total;
+    }
+
+    /**
+     * @return total size(in bytes) to send in the session
+     */
+    public long getTotalSizeToSend()
+    {
+        return getTotalSizes(sendingSummaries);
+    }
+
+    private long getTotalFilesCompleted(Collection<ProgressInfo> files)
+    {
+        Iterable<ProgressInfo> completed = Iterables.filter(files, input -> 
input.isCompleted());
+        return Iterables.size(completed);
+    }
+
+    /**
+     * @return total number of files already received.
+     */
+    public long getTotalFilesReceived()
+    {
+        return getTotalFilesCompleted(receivingFiles);
+    }
+
+    /**
+     * @return total number of files already sent.
+     */
+    public long getTotalFilesSent()

Review Comment:
   ```suggestion
       public long totalFilesSent()
   ```



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.data;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.stream.Collectors;
+import javax.management.openmbean.CompositeData;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Representation of session info data
+ */
+public class SessionInfo
+{
+    public final String peer;
+    public final int sessionIndex;
+    public final String connecting;
+    /** Immutable collection of receiving summaries */
+    public final Collection<StreamSummary> receivingSummaries;
+    /** Immutable collection of sending summaries*/
+    public final Collection<StreamSummary> sendingSummaries;
+    /** Current session state */
+    public final String state;
+
+    public final Collection<ProgressInfo> receivingFiles;
+    public final Collection<ProgressInfo> sendingFiles;
+
+    public SessionInfo(CompositeData data)
+    {
+        this.peer = (String) data.get("peer");
+        this.sessionIndex = (int) data.get("sessionIndex");
+        this.connecting = (String) data.get("connecting");
+        this.receivingSummaries = parseSummaries((CompositeData[]) 
data.get("receivingSummaries"));
+        this.sendingSummaries = parseSummaries((CompositeData[]) 
data.get("sendingSummaries"));
+        this.state = (String) data.get("state");
+        this.receivingFiles = parseFiles((CompositeData[]) 
data.get("receivingFiles"));
+        this.sendingFiles = parseFiles((CompositeData[]) 
data.get("sendingFiles"));
+    }
+
+    private Collection<StreamSummary> parseSummaries(CompositeData[] summaries)
+    {
+        return 
Arrays.stream(summaries).map(StreamSummary::new).collect(Collectors.toList());
+    }
+
+    private Collection<ProgressInfo> parseFiles(CompositeData[] files)
+    {
+        return 
Arrays.stream(files).map(ProgressInfo::new).collect(Collectors.toList());
+    }
+
+    private long getTotalFiles(Collection<StreamSummary> summaries)
+    {
+        long total = 0;
+        for (StreamSummary summary : summaries)
+            total += summary.files;
+        return total;
+    }
+
+    /**
+     * @return total size(in bytes) already received.
+     */
+    public long getTotalSizeReceived()
+    {
+        return getTotalSizeInProgress(receivingFiles);
+    }
+
+    /**
+     * @return total size(in bytes) already sent.
+     */
+    public long getTotalSizeSent()
+    {
+        return getTotalSizeInProgress(sendingFiles);
+    }
+
+    /**
+     * @return total number of files to receive in the session
+     */
+    public long getTotalFilesToReceive()
+    {
+        return getTotalFiles(receivingSummaries);
+    }
+
+    /**
+     * @return total number of files to send in the session
+     */
+    public long getTotalFilesToSend()
+    {
+        return getTotalFiles(sendingSummaries);
+    }
+
+    private long getTotalSizes(Collection<StreamSummary> summaries)
+    {
+        long total = 0;
+        for (StreamSummary summary : summaries)
+            total += summary.totalSize;
+        return total;
+    }
+
+    /**
+     * @return total size(in bytes) to receive in the session
+     */
+    public long getTotalSizeToReceive()
+    {
+        return getTotalSizes(receivingSummaries);
+    }
+
+    private long getTotalSizeInProgress(Collection<ProgressInfo> streams)
+    {
+        long total = 0;
+        for (ProgressInfo stream : streams)
+            total += stream.currentBytes;
+        return total;
+    }
+
+    /**
+     * @return total size(in bytes) to send in the session
+     */
+    public long getTotalSizeToSend()
+    {
+        return getTotalSizes(sendingSummaries);
+    }
+
+    private long getTotalFilesCompleted(Collection<ProgressInfo> files)
+    {
+        Iterable<ProgressInfo> completed = Iterables.filter(files, input -> 
input.isCompleted());
+        return Iterables.size(completed);
+    }
+
+    /**
+     * @return total number of files already received.
+     */
+    public long getTotalFilesReceived()

Review Comment:
   ```suggestion
       public long totalFilesReceived()
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/routes/StreamStatsHandler.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import com.google.inject.Inject;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.common.response.StreamStatsResponse;
+import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.common.server.StreamManagerOperations;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable;
+
+/**
+ * Handler for retrieving node streams stats
+ */
+public class StreamStatsHandler extends AbstractHandler<Void>
+{
+
+    /**
+     * Constructs a handler with the provided {@code metadataFetcher}
+     *
+     * @param metadataFetcher the metadata fetcher
+     * @param executorPools   executor pools for blocking executions
+     */
+    @Inject
+    protected StreamStatsHandler(InstanceMetadataFetcher metadataFetcher,
+                                 ExecutorPools executorPools)
+    {
+        super(metadataFetcher, executorPools, null);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void handleInternal(RoutingContext context,
+                               HttpServerRequest httpRequest,
+                               String host,
+                               SocketAddress remoteAddress,
+                               Void request)
+    {
+
+        CassandraAdapterDelegate delegate = metadataFetcher.delegate(host);
+        if (delegate == null)
+        {
+            context.fail(cassandraServiceUnavailable());
+            return;
+        }
+
+        StorageOperations storageOperations = delegate.storageOperations();
+        StreamManagerOperations streamMgrOperations = 
delegate.streamManagerOperations();
+        if (storageOperations == null || streamMgrOperations == null)
+        {
+            context.fail(cassandraServiceUnavailable());
+            return;
+        }
+
+        executorPools.service()
+                     .executeBlocking(() -> {
+                         String mode = storageOperations.getOperationMode();
+                         StreamProgressStats stats = 
streamMgrOperations.getStreamProgressStats();
+                         return new StreamStatsResponse(mode, stats);
+                     })
+                     .onSuccess(context::json)
+                     .onFailure(cause -> processFailure(cause, context, host, 
remoteAddress, request));
+
+    }
+
+    protected Void extractParamsOrThrow(RoutingContext context)

Review Comment:
   let's add the `@Override` annotation here.



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.data;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.stream.Collectors;
+import javax.management.openmbean.CompositeData;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Representation of session info data
+ */
+public class SessionInfo
+{
+    public final String peer;
+    public final int sessionIndex;
+    public final String connecting;
+    /** Immutable collection of receiving summaries */
+    public final Collection<StreamSummary> receivingSummaries;
+    /** Immutable collection of sending summaries*/
+    public final Collection<StreamSummary> sendingSummaries;
+    /** Current session state */
+    public final String state;
+
+    public final Collection<ProgressInfo> receivingFiles;
+    public final Collection<ProgressInfo> sendingFiles;
+
+    public SessionInfo(CompositeData data)
+    {
+        this.peer = (String) data.get("peer");
+        this.sessionIndex = (int) data.get("sessionIndex");
+        this.connecting = (String) data.get("connecting");
+        this.receivingSummaries = parseSummaries((CompositeData[]) 
data.get("receivingSummaries"));
+        this.sendingSummaries = parseSummaries((CompositeData[]) 
data.get("sendingSummaries"));
+        this.state = (String) data.get("state");
+        this.receivingFiles = parseFiles((CompositeData[]) 
data.get("receivingFiles"));
+        this.sendingFiles = parseFiles((CompositeData[]) 
data.get("sendingFiles"));
+    }
+
+    private Collection<StreamSummary> parseSummaries(CompositeData[] summaries)
+    {
+        return 
Arrays.stream(summaries).map(StreamSummary::new).collect(Collectors.toList());
+    }
+
+    private Collection<ProgressInfo> parseFiles(CompositeData[] files)
+    {
+        return 
Arrays.stream(files).map(ProgressInfo::new).collect(Collectors.toList());
+    }
+
+    private long getTotalFiles(Collection<StreamSummary> summaries)
+    {
+        long total = 0;
+        for (StreamSummary summary : summaries)
+            total += summary.files;
+        return total;
+    }
+
+    /**
+     * @return total size(in bytes) already received.
+     */
+    public long getTotalSizeReceived()
+    {
+        return getTotalSizeInProgress(receivingFiles);
+    }
+
+    /**
+     * @return total size(in bytes) already sent.
+     */
+    public long getTotalSizeSent()
+    {
+        return getTotalSizeInProgress(sendingFiles);
+    }
+
+    /**
+     * @return total number of files to receive in the session
+     */
+    public long getTotalFilesToReceive()
+    {
+        return getTotalFiles(receivingSummaries);
+    }
+
+    /**
+     * @return total number of files to send in the session
+     */
+    public long getTotalFilesToSend()

Review Comment:
   ```suggestion
       public long totalFilesToSend()
   ```



##########
client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java:
##########
@@ -664,6 +665,18 @@ public CompletableFuture<ListOperationalJobsResponse> 
listOperationalJobs(Sideca
                                             .build());
     }
 
+    /**
+     * Executes the stream stats request using the default retry policy and 
configured selection policy
+     *
+     * @return a completable future of the connected client stats
+     */
+    public CompletableFuture<StreamStatsResponse> streamsStats()

Review Comment:
   this request should be directed at a single instance I assume?



##########
server/src/main/java/org/apache/cassandra/sidecar/routes/StreamStatsHandler.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import com.google.inject.Inject;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.common.response.StreamStatsResponse;
+import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.common.server.StreamManagerOperations;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable;
+
+/**
+ * Handler for retrieving node streams stats
+ */
+public class StreamStatsHandler extends AbstractHandler<Void>
+{
+

Review Comment:
   ```suggestion
   ```



##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java:
##########
@@ -125,6 +125,7 @@ public final class ApiEndpointsV1
     public static final String PER_OPERATIONAL_JOB = OPERATIONAL_JOBS + '/' + 
OPERATIONAL_JOB_ID_PATH_PARAM;
     public static final String LIST_OPERATIONAL_JOBS_ROUTE = API_V1 + 
CASSANDRA + OPERATIONAL_JOBS;
     public static final String OPERATIONAL_JOB_ROUTE = API_V1 + CASSANDRA + 
PER_OPERATIONAL_JOB;
+    public static final String STREAM_STATS_ROUTE = API_V1 + CASSANDRA + 
"/stats/stream";

Review Comment:
   Should be plural here if you are retrieving all the streams
   ```suggestion
       public static final String STREAM_STATS_ROUTE = API_V1 + CASSANDRA + 
"/stats/streams";
   ```



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/StreamSummary.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.data;
+
+import javax.management.openmbean.CompositeData;
+
+/**
+ * Representation of the stream summary data
+ */
+public class StreamSummary
+{
+    public final String tableId;
+
+    /**
+     * Number of files to transfer. Can be 0 if nothing to transfer for some 
streaming request.
+     */
+    public final int files;
+    public final long totalSize;
+
+
+    public StreamSummary(String tableId, int files, long totalSize)
+    {
+        this.tableId = tableId;
+        this.files = files;
+        this.totalSize = totalSize;
+    }

Review Comment:
   this constructor is unused. do we need it?



##########
client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java:
##########
@@ -664,6 +665,18 @@ public CompletableFuture<ListOperationalJobsResponse> 
listOperationalJobs(Sideca
                                             .build());
     }
 
+    /**
+     * Executes the stream stats request using the default retry policy and 
configured selection policy

Review Comment:
   ```suggestion
        * Executes the streams stats request using the default retry policy and 
configured selection policy
   ```



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java:
##########
@@ -159,4 +159,10 @@ public interface StorageJmxOperations
      * @throws InterruptedException it does not really throw but declared in 
MBean
      */
     int forceKeyspaceCleanup(int jobs, String keyspaceName, String... tables) 
throws IOException, ExecutionException, InterruptedException;
+
+    /**
+     * Fetch the operation-mode of the node
+     * @return string representation of theoperation-mode

Review Comment:
   ```suggestion
        * @return string representation of the operation-mode
   ```



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/StreamState.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.data;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.stream.Collectors;
+import javax.management.openmbean.CompositeData;
+
+/**
+ * Representation of the stream state data
+ */
+public class StreamState
+{
+    Collection<SessionInfo> sessions;
+
+    public StreamState(CompositeData data)
+    {
+        this.sessions = parseSessions((CompositeData[]) data.get("sessions"));
+    }
+
+    private Collection<SessionInfo> parseSessions(CompositeData[] sessions)

Review Comment:
   let's move helper methods to the end of the class



##########
client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java:
##########
@@ -1573,6 +1575,36 @@ public void onError(Throwable throwable)
         assertThat(new String(baos.toByteArray(), 
StandardCharsets.UTF_8)).isEqualTo("Test Content");
     }
 
+    @Test
+    public void testStreamsStats() throws Exception
+    {
+        String streamStatsResponseAsString = "{\"operationMode\":\"NORMAL\"," +
+                                             
"\"streamProgressStats\":{\"totalFilesToReceive\":7," +

Review Comment:
   ```suggestion
                                                
"\"streamsProgressStats\":{\"totalFilesToReceive\":7," +
   ```



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStreamManagerOperations.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.management.openmbean.CompositeData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.adapters.base.data.SessionInfo;
+import org.apache.cassandra.sidecar.adapters.base.data.StreamState;
+import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats;
+import org.apache.cassandra.sidecar.common.server.JmxClient;
+import org.apache.cassandra.sidecar.common.server.StreamManagerOperations;
+import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
+
+import static 
org.apache.cassandra.sidecar.adapters.base.StreamManagerJmxOperations.STREAM_MANAGER_OBJ_NAME;
+
+/**
+ * An implementation of the {@link StreamManagerOperations} that interfaces 
with Cassandra 4.0 and later
+ */
+public class CassandraStreamManagerOperations implements 
StreamManagerOperations
+{
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraStorageOperations.class);
+    protected final JmxClient jmxClient;
+
+    /**
+     * Creates a new instance with the provided {@link JmxClient} and {@link 
DnsResolver}
+     *
+     * @param jmxClient   the JMX client used to communicate with the 
Cassandra instance
+     * @param dnsResolver the DNS resolver used to lookup replicas
+     */
+    public CassandraStreamManagerOperations(JmxClient jmxClient, DnsResolver 
dnsResolver)
+    {
+        this.jmxClient = jmxClient;
+    }
+
+    public StreamProgressStats getStreamProgressStats()
+    {
+        Set<CompositeData> streamData = 
jmxClient.proxy(StreamManagerJmxOperations.class, STREAM_MANAGER_OBJ_NAME)
+                                                 .getCurrentStreams();
+
+        List<StreamState> streamStates = 
streamData.stream().map(StreamState::new).collect(Collectors.toList());
+        return computeStats(streamStates);
+    }
+
+    private StreamProgressStats computeStats(List<StreamState> streamStates)
+    {
+        List<SessionInfo> sessions = streamStates.stream().map(s -> 
s.sessions()).flatMap(Collection::stream).collect(Collectors.toList());
+
+        long totalFilesToReceive = 0;
+        long totalFilesReceived = 0;
+        long totalBytesToReceive = 0;
+        long totalBytesReceived = 0;
+
+        long totalFilesToSend = 0;
+        long totalFilesSent = 0;
+        long totalBytesToSend = 0;
+        long totalBytesSent = 0;
+
+        for (SessionInfo s : sessions)
+        {
+            totalBytesToReceive += s.getTotalSizeToReceive();
+            totalBytesReceived += s.getTotalSizeReceived();
+            totalFilesToReceive += s.getTotalFilesToReceive();
+            totalFilesReceived += s.getTotalFilesReceived();
+            totalBytesToSend += s.getTotalSizeToSend();
+            totalBytesSent += s.getTotalSizeSent();
+            totalFilesToSend += s.getTotalFilesToSend();
+            totalFilesSent += s.getTotalFilesSent();
+
+        }
+        LOGGER.info("Progress Stats: {}, {}, {}, {}", totalBytesToReceive, 
totalBytesReceived, totalBytesToSend, totalBytesSent);

Review Comment:
   hmm, should we move this to debug instead?



##########
server/src/test/java/org/apache/cassandra/sidecar/routes/StreamStatsHandlerTest.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.response.StreamStatsResponse;
+import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.common.server.StreamManagerOperations;
+import org.apache.cassandra.sidecar.server.MainModule;
+import org.apache.cassandra.sidecar.server.Server;
+import org.mockito.stubbing.Answer;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link StreamStatsHandler}
+ */
+@ExtendWith(VertxExtension.class)
+public class StreamStatsHandlerTest
+{
+
+    static final Logger LOGGER = 
LoggerFactory.getLogger(StreamStatsHandlerTest.class);
+    Vertx vertx;
+    Server server;
+
+    @BeforeEach
+    void before() throws InterruptedException
+    {
+        Module testOverride = Modules.override(new TestModule())
+                                     .with(new StreamingStatsTestModule());
+        Injector injector = Guice.createInjector(Modules.override(new 
MainModule())
+                                                        .with(testOverride));
+        vertx = injector.getInstance(Vertx.class);
+        server = injector.getInstance(Server.class);
+        VertxTestContext context = new VertxTestContext();
+        server.start()
+              .onSuccess(s -> context.completeNow())
+              .onFailure(context::failNow);
+        context.awaitCompletion(5, TimeUnit.SECONDS);
+    }
+
+    @AfterEach
+    void after() throws InterruptedException
+    {
+        CountDownLatch closeLatch = new CountDownLatch(1);
+        server.close().onSuccess(res -> closeLatch.countDown());
+        if (closeLatch.await(60, TimeUnit.SECONDS))
+            LOGGER.info("Close event received before timeout.");
+        else
+            LOGGER.error("Close event timed out.");
+    }
+
+    @Test
+    void testStreamingStatsHandler(VertxTestContext context)
+    {
+        StreamingStatsTestModule.streamingStatsSupplier = () -> {
+            StreamStatsResponse response = new StreamStatsResponse("NORMAL",
+                                                                   new 
StreamProgressStats(7, 7, 1024, 1024, 0, 0, 0, 0));
+            return response;
+        };
+
+
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/api/v1/cassandra/stats/stream";
+        client.get(server.actualPort(), "127.0.0.1", testRoute)
+              .expect(ResponsePredicate.SC_OK)
+              .send(context.succeeding(response -> {
+                  assertThat(response.statusCode()).isEqualTo(OK.code());
+                  StreamStatsResponse statsResponse = 
response.bodyAsJson(StreamStatsResponse.class);
+                  assertThat(statsResponse).isNotNull();
+                  
assertThat(statsResponse.operationMode()).isEqualTo("NORMAL");
+                  context.completeNow();
+              }));
+    }
+
+    @Test
+    void testStreamingStatsHandlerFailure(VertxTestContext context)

Review Comment:
   this test is exactly the same as the `testStreamingStatsHandler` test. Are 
we really testing failure here?



##########
server/src/test/java/org/apache/cassandra/sidecar/routes/StreamStatsHandlerTest.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.response.StreamStatsResponse;
+import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.common.server.StreamManagerOperations;
+import org.apache.cassandra.sidecar.server.MainModule;
+import org.apache.cassandra.sidecar.server.Server;
+import org.mockito.stubbing.Answer;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link StreamStatsHandler}
+ */
+@ExtendWith(VertxExtension.class)
+public class StreamStatsHandlerTest
+{
+
+    static final Logger LOGGER = 
LoggerFactory.getLogger(StreamStatsHandlerTest.class);
+    Vertx vertx;
+    Server server;
+
+    @BeforeEach
+    void before() throws InterruptedException
+    {
+        Module testOverride = Modules.override(new TestModule())
+                                     .with(new StreamingStatsTestModule());
+        Injector injector = Guice.createInjector(Modules.override(new 
MainModule())
+                                                        .with(testOverride));
+        vertx = injector.getInstance(Vertx.class);
+        server = injector.getInstance(Server.class);
+        VertxTestContext context = new VertxTestContext();
+        server.start()
+              .onSuccess(s -> context.completeNow())
+              .onFailure(context::failNow);
+        context.awaitCompletion(5, TimeUnit.SECONDS);
+    }
+
+    @AfterEach
+    void after() throws InterruptedException
+    {
+        CountDownLatch closeLatch = new CountDownLatch(1);
+        server.close().onSuccess(res -> closeLatch.countDown());
+        if (closeLatch.await(60, TimeUnit.SECONDS))
+            LOGGER.info("Close event received before timeout.");
+        else
+            LOGGER.error("Close event timed out.");
+    }
+
+    @Test
+    void testStreamingStatsHandler(VertxTestContext context)
+    {
+        StreamingStatsTestModule.streamingStatsSupplier = () -> {
+            StreamStatsResponse response = new StreamStatsResponse("NORMAL",
+                                                                   new 
StreamProgressStats(7, 7, 1024, 1024, 0, 0, 0, 0));
+            return response;
+        };
+
+
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/api/v1/cassandra/stats/stream";
+        client.get(server.actualPort(), "127.0.0.1", testRoute)
+              .expect(ResponsePredicate.SC_OK)
+              .send(context.succeeding(response -> {
+                  assertThat(response.statusCode()).isEqualTo(OK.code());
+                  StreamStatsResponse statsResponse = 
response.bodyAsJson(StreamStatsResponse.class);
+                  assertThat(statsResponse).isNotNull();
+                  
assertThat(statsResponse.operationMode()).isEqualTo("NORMAL");
+                  context.completeNow();
+              }));
+    }
+
+    @Test
+    void testStreamingStatsHandlerFailure(VertxTestContext context)
+    {
+        StreamingStatsTestModule.streamingStatsSupplier = () -> {
+            StreamStatsResponse response = new StreamStatsResponse("NORMAL",
+                                                                   new 
StreamProgressStats(7, 7, 1024, 1024, 0, 0, 0, 0));
+            return response;
+        };
+
+
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/api/v1/cassandra/stats/stream";
+        client.get(server.actualPort(), "127.0.0.1", testRoute)
+              .expect(ResponsePredicate.SC_OK)
+              .send(context.succeeding(response -> {
+                  assertThat(response.statusCode()).isEqualTo(OK.code());
+                  StreamStatsResponse statsResponse = 
response.bodyAsJson(StreamStatsResponse.class);
+                  assertThat(statsResponse).isNotNull();
+                  
assertThat(statsResponse.operationMode()).isEqualTo("NORMAL");
+                  context.completeNow();
+              }));
+    }
+
+
+    static class StreamingStatsTestModule extends AbstractModule
+    {
+        static Supplier<StreamStatsResponse> streamingStatsSupplier;
+
+        @Provides
+        @Singleton
+        public InstancesConfig instanceConfig() throws IOException

Review Comment:
   ```suggestion
           public InstancesConfig instanceConfig()
   ```



##########
server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import com.datastax.driver.core.Session;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.dynamic.ClassFileLocator;
+import net.bytebuddy.dynamic.TypeResolutionStrategy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import net.bytebuddy.pool.TypePool;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.shared.Uninterruptibles;
+import org.apache.cassandra.sidecar.common.response.StreamStatsResponse;
+import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats;
+import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
+import org.apache.cassandra.sidecar.testing.CassandraSidecarTestContext;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests the stream stats endpoint with cassandra container.
+ */
+@ExtendWith(VertxExtension.class)
+public class StreamStatsIntegrationTest extends IntegrationTestBase
+{
+    @CassandraIntegrationTest(numDataDirsPerInstance = 4, nodesPerDc = 5, 
network = true, buildCluster = false)
+    void streamStatsTest(VertxTestContext context, 
ConfigurableCassandraTestContext cassandraTestContext) throws 
InterruptedException
+    {
+
+        BBHelperDecommissioningNode.reset();
+        UpgradeableCluster cluster = 
cassandraTestContext.configureAndStartCluster(
+        builder -> 
builder.withInstanceInitializer(BBHelperDecommissioningNode::install));
+        IUpgradeableInstance node = cluster.get(5);
+        IUpgradeableInstance seed = cluster.get(1);
+
+        createTestKeyspace();
+        createTestTableAndPopulate();
+
+        startAsync("Decommission node" + node.config().num(),
+                   () -> 
node.nodetoolResult("decommission").asserts().success());
+        AtomicBoolean hasStats = new AtomicBoolean(false);
+        AtomicBoolean dataReceived = new AtomicBoolean(false);
+
+        // Wait until nodes have reached expected state
+        awaitLatchOrThrow(BBHelperDecommissioningNode.transientStateStart, 2, 
TimeUnit.MINUTES, "transientStateStart");
+
+        ClusterUtils.awaitRingState(seed, node, "Leaving");
+        BBHelperDecommissioningNode.transientStateEnd.countDown();
+
+        for (int i = 0; i < 20; i++)
+        {
+            startAsync("Request-" + i , () -> {
+                try
+                {
+                    streamStats(context, hasStats, dataReceived);
+                }
+                catch (Exception e)
+                {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+        }
+
+        assertThat(hasStats).isTrue();
+        assertThat(dataReceived).isTrue();
+        context.completeNow();
+        context.awaitCompletion(2, TimeUnit.MINUTES);
+    }
+
+    private void streamStats(VertxTestContext context, AtomicBoolean hasStats, 
AtomicBoolean dataReceived) throws Exception
+    {
+        String testRoute = "/api/v1/cassandra/stats/stream";
+        testWithClient(context, client -> {
+            BBHelperDecommissioningNode.transientStateEnd.countDown();
+            client.get(server.actualPort(), "127.0.0.1", testRoute)
+                  .send(context.succeeding(response -> {
+                       assertRingResponseOK(response, sidecarTestContext, 
hasStats, dataReceived);
+                  }));
+        });
+    }
+
+    void assertRingResponseOK(HttpResponse<Buffer> response, 
CassandraSidecarTestContext cassandraTestContext,

Review Comment:
   this is not really a ring response? Also cassandraTestContext is unused
   ```suggestion
       void assertRingResponseOK(HttpResponse<Buffer> response,
   ```



##########
spotbugs-exclude.xml:
##########
@@ -62,7 +62,10 @@
 
     <Match>
         <Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD" />
-        <Class name="org.apache.cassandra.sidecar.routes.RingHandlerTest" />
+        <Or>
+            <Class name="org.apache.cassandra.sidecar.routes.RingHandlerTest" 
/>
+            <Class 
name="org.apache.cassandra.sidecar.routes.StreamStatsHandlerTest" />

Review Comment:
   let's not statically set the fields



##########
server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import com.datastax.driver.core.Session;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.dynamic.ClassFileLocator;
+import net.bytebuddy.dynamic.TypeResolutionStrategy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import net.bytebuddy.pool.TypePool;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.shared.Uninterruptibles;
+import org.apache.cassandra.sidecar.common.response.StreamStatsResponse;
+import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats;
+import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
+import org.apache.cassandra.sidecar.testing.CassandraSidecarTestContext;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests the stream stats endpoint with cassandra container.
+ */
+@ExtendWith(VertxExtension.class)
+public class StreamStatsIntegrationTest extends IntegrationTestBase
+{
+    @CassandraIntegrationTest(numDataDirsPerInstance = 4, nodesPerDc = 5, 
network = true, buildCluster = false)
+    void streamStatsTest(VertxTestContext context, 
ConfigurableCassandraTestContext cassandraTestContext) throws 
InterruptedException
+    {
+

Review Comment:
   ```suggestion
   ```



##########
server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import com.datastax.driver.core.Session;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.dynamic.ClassFileLocator;
+import net.bytebuddy.dynamic.TypeResolutionStrategy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import net.bytebuddy.pool.TypePool;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.shared.Uninterruptibles;
+import org.apache.cassandra.sidecar.common.response.StreamStatsResponse;
+import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats;
+import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
+import org.apache.cassandra.sidecar.testing.CassandraSidecarTestContext;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests the stream stats endpoint with cassandra container.
+ */
+@ExtendWith(VertxExtension.class)
+public class StreamStatsIntegrationTest extends IntegrationTestBase
+{
+    @CassandraIntegrationTest(numDataDirsPerInstance = 4, nodesPerDc = 5, 
network = true, buildCluster = false)

Review Comment:
   do we need 5 nodes? We are trying to save time on test runtime and carefully 
choosing the cluster size is a goal we have to reduce the runtime and resources 
needed to run tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to