This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.1 by this push:
     new 5be1038c5d Streaming progress virtual table lock contention can 
trigger TCP_USER_TIMEOUT and fail streaming
5be1038c5d is described below

commit 5be1038c5d38af32d3cbb0545d867f21304f3a46
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Wed Jan 11 13:40:57 2023 -0800

    Streaming progress virtual table lock contention can trigger 
TCP_USER_TIMEOUT and fail streaming
    
    patch by David Capwell; reviewed by Abe Ratnofsky, Jon Meredith for 
CASSANDRA-18110
---
 CHANGES.txt                                        |   3 +-
 conf/cassandra.yaml                                |  10 ++
 src/java/org/apache/cassandra/config/Config.java   |   3 +
 .../cassandra/config/DatabaseDescriptor.java       |  27 ++++
 .../org/apache/cassandra/config/DurationSpec.java  |  10 ++
 .../streaming/CassandraCompressedStreamReader.java |   7 +-
 .../streaming/CassandraCompressedStreamWriter.java |   3 +-
 .../CassandraEntireSSTableStreamReader.java        |   2 +-
 .../CassandraEntireSSTableStreamWriter.java        |   2 +-
 .../db/streaming/CassandraStreamReader.java        |   7 +-
 .../db/streaming/CassandraStreamWriter.java        |   6 +-
 .../apache/cassandra/streaming/ProgressInfo.java   |   5 +-
 .../apache/cassandra/streaming/StreamEvent.java    |   4 +-
 .../apache/cassandra/streaming/StreamManager.java  |  26 ++++
 .../cassandra/streaming/StreamManagerMBean.java    |  20 +++
 .../cassandra/streaming/StreamResultFuture.java    |  15 ++-
 .../apache/cassandra/streaming/StreamSession.java  |  22 ++--
 .../apache/cassandra/streaming/StreamingState.java | 143 +++++----------------
 .../management/ProgressInfoCompositeData.java      |   3 +
 .../test/streaming/RebuildStreamingTest.java       |  33 ++++-
 .../test/streaming/StreamingStatsDisabledTest.java |  65 ++++++++++
 .../distributed/util/QueryResultUtil.java          |   7 +
 .../db/virtual/StreamingVirtualTableTest.java      |  85 +++++++++---
 .../cassandra/streaming/SessionInfoTest.java       |   4 +-
 24 files changed, 361 insertions(+), 151 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index c6ceac0575..c5d192a9c8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 4.1.1
-* Fix perpetual load of denylist on read in cases where denylist can never be 
loaded (CASSANDRA-18116)
+ * Streaming progress virtual table lock contention can trigger 
TCP_USER_TIMEOUT and fail streaming (CASSANDRA-18110)
+ * Fix perpetual load of denylist on read in cases where denylist can never be 
loaded (CASSANDRA-18116)
 Merged from 4.0:
  * Avoid ConcurrentModificationException in STCS/DTCS/TWCS.getSSTables 
(CASSANDRA-17977)
  * Restore internode custom tracing on 4.0's new messaging system 
(CASSANDRA-17981)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index ef7cc72605..f54c0d4617 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1161,6 +1161,16 @@ slow_query_log_timeout: 500ms
 # bound (for example a few nodes with big files).
 # streaming_connections_per_host: 1
 
+# Settings for stream stats tracking; used by system_views.streaming table
+# How long before a stream is evicted from tracking; this impacts both 
historic and currently running
+# streams.
+# streaming_state_expires: 3d
+# How much memory may be used for tracking before evicting session from 
tracking; once crossed
+# historic and currently running streams maybe impacted.
+# streaming_state_size: 40MiB
+# Enable/Disable tracking of streaming stats
+# streaming_stats_enabled: true
+
 # Allows denying configurable access (rw/rr) to operations on configured ks, 
table, and partitions, intended for use by
 # operators to manage cluster health vs application access. See 
CASSANDRA-12106 and CEP-13 for more details.
 # partition_denylist_enabled: false
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index df419e780c..8a59ca2cda 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -852,6 +852,9 @@ public class Config
     public volatile DurationSpec.LongNanosecondsBound streaming_state_expires 
= new DurationSpec.LongNanosecondsBound("3d");
     public volatile DataStorageSpec.LongBytesBound streaming_state_size = new 
DataStorageSpec.LongBytesBound("40MiB");
 
+    public volatile boolean streaming_stats_enabled = true;
+    public volatile DurationSpec.IntSecondsBound 
streaming_slow_events_log_timeout = new DurationSpec.IntSecondsBound("10s");
+
     /** The configuration of startup checks. */
     public volatile Map<StartupCheckType, Map<String, Object>> startup_checks 
= new HashMap<>();
 
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 62d1acdada..d2c529c3d4 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -4294,6 +4294,33 @@ public class DatabaseDescriptor
         }
     }
 
+    public static boolean getStreamingStatsEnabled()
+    {
+        return conf.streaming_stats_enabled;
+    }
+
+    public static void setStreamingStatsEnabled(boolean streamingStatsEnabled)
+    {
+        if (conf.streaming_stats_enabled != streamingStatsEnabled)
+        {
+            logger.info("Setting streaming_stats_enabled to {}", 
streamingStatsEnabled);
+            conf.streaming_stats_enabled = streamingStatsEnabled;
+        }
+    }
+
+    public static DurationSpec.IntSecondsBound 
getStreamingSlowEventsLogTimeout() {
+        return conf.streaming_slow_events_log_timeout;
+    }
+
+    public static void setStreamingSlowEventsLogTimeout(String value) {
+        DurationSpec.IntSecondsBound next = new 
DurationSpec.IntSecondsBound(value);
+        if (!conf.streaming_slow_events_log_timeout.equals(next))
+        {
+            logger.info("Setting streaming_slow_events_log to " + value);
+            conf.streaming_slow_events_log_timeout = next;
+        }
+    }
+
     public static boolean isUUIDSSTableIdentifiersEnabled()
     {
         return conf.uuid_sstable_identifiers_enabled;
diff --git a/src/java/org/apache/cassandra/config/DurationSpec.java 
b/src/java/org/apache/cassandra/config/DurationSpec.java
index ba7e9f8415..10d56c23eb 100644
--- a/src/java/org/apache/cassandra/config/DurationSpec.java
+++ b/src/java/org/apache/cassandra/config/DurationSpec.java
@@ -520,6 +520,16 @@ public abstract class DurationSpec
             return new IntSecondsBound(value);
         }
 
+        /**
+         * Returns this duration in the number of nanoseconds as an {@code int}
+         *
+         * @return this duration in number of nanoseconds or {@code 
Integer.MAX_VALUE} if the number of nanoseconds is too large.
+         */
+        public int toNanoseconds()
+        {
+            return Ints.saturatedCast(unit().toNanos(quantity()));
+        }
+
         /**
          * Returns this duration in number of milliseconds as an {@code int}
          *
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
 
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
index dda874ba40..005a9aaa6c 100644
--- 
a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
+++ 
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
@@ -78,6 +78,7 @@ public class CassandraCompressedStreamReader extends 
CassandraStreamReader
             deserializer = new StreamDeserializer(cfs.metadata(), in, 
inputVersion, getHeader(cfs.metadata()));
             writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, 
format);
             String filename = writer.getFilename();
+            String sectionName = filename + '-' + fileSeqNum;
             int sectionIdx = 0;
             for (SSTableReader.PartitionPositionBounds section : sections)
             {
@@ -89,11 +90,15 @@ public class CassandraCompressedStreamReader extends 
CassandraStreamReader
                 cis.position(section.lowerPosition);
                 in.reset(0);
 
+                long lastBytesRead = 0;
                 while (in.getBytesRead() < sectionLength)
                 {
                     writePartition(deserializer, writer);
                     // when compressed, report total bytes of compressed 
chunks read since remoteFile.size is the sum of chunks transferred
-                    session.progress(filename + '-' + fileSeqNum, 
ProgressInfo.Direction.IN, cis.chunkBytesRead(), totalSize);
+                    long bytesRead = cis.chunkBytesRead();
+                    long bytesDelta = bytesRead - lastBytesRead;
+                    lastBytesRead = bytesRead;
+                    session.progress(sectionName, ProgressInfo.Direction.IN, 
bytesRead, bytesDelta, totalSize);
                 }
                 assert in.getBytesRead() == sectionLength;
             }
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
 
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
index 99908f261b..41fd9b1651 100644
--- 
a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
+++ 
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
@@ -71,6 +71,7 @@ public class CassandraCompressedStreamWriter extends 
CassandraStreamWriter
             int sectionIdx = 0;
 
             // stream each of the required sections of the file
+            String filename = sstable.descriptor.filenameFor(Component.DATA);
             for (Section section : sections)
             {
                 // length of the section to stream
@@ -94,7 +95,7 @@ public class CassandraCompressedStreamWriter extends 
CassandraStreamWriter
 
                     bytesTransferred += toTransfer;
                     progress += toTransfer;
-                    
session.progress(sstable.descriptor.filenameFor(Component.DATA), 
ProgressInfo.Direction.OUT, progress, totalSize);
+                    session.progress(filename, ProgressInfo.Direction.OUT, 
progress, toTransfer, totalSize);
                 }
             }
             logger.debug("[Stream #{}] Finished streaming file {} to {}, 
bytesTransferred = {}, totalSize = {}",
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
 
b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
index 261c59ef63..515c85dea6 100644
--- 
a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
+++ 
b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
@@ -122,7 +122,7 @@ public class CassandraEntireSSTableStreamReader implements 
IStreamReader
                              prettyPrintMemory(totalSize));
 
                 writer.writeComponent(component.type, in, length);
-                session.progress(writer.descriptor.filenameFor(component), 
ProgressInfo.Direction.IN, length, length);
+                session.progress(writer.descriptor.filenameFor(component), 
ProgressInfo.Direction.IN, length, length, length);
                 bytesRead += length;
 
                 logger.debug("[Stream #{}] Finished receiving {} component 
from {}, componentSize = {}, readBytes = {}, totalSize = {}",
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
 
b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
index 68546cefe6..3d679a515e 100644
--- 
a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
+++ 
b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
@@ -93,7 +93,7 @@ public class CassandraEntireSSTableStreamWriter
             long bytesWritten = out.writeFileToChannel(channel, limiter);
             progress += bytesWritten;
 
-            session.progress(sstable.descriptor.filenameFor(component), 
ProgressInfo.Direction.OUT, bytesWritten, length);
+            session.progress(sstable.descriptor.filenameFor(component), 
ProgressInfo.Direction.OUT, bytesWritten, bytesWritten, length);
 
             logger.debug("[Stream #{}] Finished streaming {}.{} gen {} 
component {} to {}, xfered = {}, length = {}, totalSize = {}",
                          session.planId(),
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
index 2cccee3a0a..04268f024c 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
@@ -126,11 +126,16 @@ public class CassandraStreamReader implements 
IStreamReader
             TrackedDataInputPlus in = new 
TrackedDataInputPlus(streamCompressionInputStream);
             deserializer = new StreamDeserializer(cfs.metadata(), in, 
inputVersion, getHeader(cfs.metadata()));
             writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, 
format);
+            String sequenceName = writer.getFilename() + '-' + fileSeqNum;
+            long lastBytesRead = 0;
             while (in.getBytesRead() < totalSize)
             {
                 writePartition(deserializer, writer);
                 // TODO move this to BytesReadTracker
-                session.progress(writer.getFilename() + '-' + fileSeqNum, 
ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
+                long bytesRead = in.getBytesRead();
+                long bytesDelta = bytesRead - lastBytesRead;
+                lastBytesRead = bytesRead;
+                session.progress(sequenceName, ProgressInfo.Direction.IN, 
bytesRead, bytesDelta, totalSize);
             }
             logger.debug("[Stream #{}] Finished receiving file #{} from {} 
readBytes = {}, totalSize = {}",
                          session.planId(), fileSeqNum, session.peer, 
FBUtilities.prettyPrintMemory(in.getBytesRead()), 
FBUtilities.prettyPrintMemory(totalSize));
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
index d69f3eec59..9d9ea3c1fc 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
@@ -94,6 +94,7 @@ public class CassandraStreamWriter
             long progress = 0L;
 
             // stream each of the required sections of the file
+            String filename = sstable.descriptor.filenameFor(Component.DATA);
             for (SSTableReader.PartitionPositionBounds section : sections)
             {
                 long start = validator == null ? section.lowerPosition : 
validator.chunkStart(section.lowerPosition);
@@ -112,8 +113,9 @@ public class CassandraStreamWriter
                     long lastBytesRead = write(proxy, validator, out, start, 
transferOffset, toTransfer, bufferSize);
                     start += lastBytesRead;
                     bytesRead += lastBytesRead;
-                    progress += (lastBytesRead - transferOffset);
-                    
session.progress(sstable.descriptor.filenameFor(Component.DATA), 
ProgressInfo.Direction.OUT, progress, totalSize);
+                    long delta = lastBytesRead - transferOffset;
+                    progress += delta;
+                    session.progress(filename, ProgressInfo.Direction.OUT, 
progress, delta, totalSize);
                     transferOffset = 0;
                 }
 
diff --git a/src/java/org/apache/cassandra/streaming/ProgressInfo.java 
b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
index 2b306f8c1b..2ed78aca27 100644
--- a/src/java/org/apache/cassandra/streaming/ProgressInfo.java
+++ b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
@@ -54,9 +54,11 @@ public class ProgressInfo implements Serializable
     public final String fileName;
     public final Direction direction;
     public final long currentBytes;
+    public final long deltaBytes; // change from previous ProgressInfo
     public final long totalBytes;
 
-    public ProgressInfo(InetAddressAndPort peer, int sessionIndex, String 
fileName, Direction direction, long currentBytes, long totalBytes)
+    public ProgressInfo(InetAddressAndPort peer, int sessionIndex, String 
fileName, Direction direction,
+                        long currentBytes,  long deltaBytes, long totalBytes)
     {
         assert totalBytes > 0;
 
@@ -65,6 +67,7 @@ public class ProgressInfo implements Serializable
         this.fileName = fileName;
         this.direction = direction;
         this.currentBytes = currentBytes;
+        this.deltaBytes = deltaBytes;
         this.totalBytes = totalBytes;
     }
 
diff --git a/src/java/org/apache/cassandra/streaming/StreamEvent.java 
b/src/java/org/apache/cassandra/streaming/StreamEvent.java
index be7ad3dea6..ff83a191e9 100644
--- a/src/java/org/apache/cassandra/streaming/StreamEvent.java
+++ b/src/java/org/apache/cassandra/streaming/StreamEvent.java
@@ -87,11 +87,13 @@ public abstract class StreamEvent
     public static class SessionPreparedEvent extends StreamEvent
     {
         public final SessionInfo session;
+        public final StreamSession.PrepareDirection prepareDirection;
 
-        public SessionPreparedEvent(TimeUUID planId, SessionInfo session)
+        public SessionPreparedEvent(TimeUUID planId, SessionInfo session, 
StreamSession.PrepareDirection prepareDirection)
         {
             super(Type.STREAM_PREPARED, planId);
             this.session = session;
+            this.prepareDirection = prepareDirection;
         }
     }
 }
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java 
b/src/java/org/apache/cassandra/streaming/StreamManager.java
index 46ab422eff..408b6f4abe 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -230,6 +230,8 @@ public class StreamManager implements StreamManagerMBean
         @Override
         public void onRegister(StreamResultFuture result)
         {
+            if (!DatabaseDescriptor.getStreamingStatsEnabled())
+                return;
             // reason for synchronized rather than states.get is to detect 
duplicates
             // streaming shouldn't be producing duplicates as that would imply 
a planId collision
             synchronized (states)
@@ -312,6 +314,30 @@ public class StreamManager implements StreamManagerMBean
         }));
     }
 
+    @Override
+    public boolean getStreamingStatsEnabled()
+    {
+        return DatabaseDescriptor.getStreamingStatsEnabled();
+    }
+
+    @Override
+    public void setStreamingStatsEnabled(boolean streamingStatsEnabled)
+    {
+        DatabaseDescriptor.setStreamingStatsEnabled(streamingStatsEnabled);
+    }
+
+    @Override
+    public String getStreamingSlowEventsLogTimeout()
+    {
+        return 
DatabaseDescriptor.getStreamingSlowEventsLogTimeout().toString();
+    }
+
+    @Override
+    public void setStreamingSlowEventsLogTimeout(String value)
+    {
+        DatabaseDescriptor.setStreamingSlowEventsLogTimeout(value);
+    }
+
     public void registerInitiator(final StreamResultFuture result)
     {
         result.addEventListener(notifier);
diff --git a/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java 
b/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
index f329596c4a..e49c059590 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
@@ -29,4 +29,24 @@ public interface StreamManagerMBean extends 
NotificationEmitter
      * Returns the current state of all ongoing streams.
      */
     Set<CompositeData> getCurrentStreams();
+
+    /**
+     * @return whether the streaming virtual table should collect stats while 
streaming is running
+     */
+    boolean getStreamingStatsEnabled();
+
+    /**
+     * enable/disable collection of streaming stats while streaming is running.
+     */
+    void setStreamingStatsEnabled(boolean streamingStatsEnabled);
+
+    /**
+     * @return current timeout for streaming slow events log
+     */
+    String getStreamingSlowEventsLogTimeout();
+
+    /**
+     * Sets the timeout for the streaming slow events log
+     */
+    void setStreamingSlowEventsLogTimeout(String value);
 }
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java 
b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 22af238de6..b43203dee3 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -17,11 +17,15 @@
  */
 package org.apache.cassandra.streaming;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.TimeUUID;
 import org.apache.cassandra.utils.concurrent.AsyncFuture;
 import org.slf4j.Logger;
@@ -31,6 +35,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static 
org.apache.cassandra.streaming.StreamingChannel.Factory.Global.streamingFactory;
+import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 
 /**
  * A future on the result ({@link StreamState}) of a streaming plan.
@@ -53,6 +58,7 @@ public final class StreamResultFuture extends 
AsyncFuture<StreamState>
     public final StreamOperation streamOperation;
     private final StreamCoordinator coordinator;
     private final Collection<StreamEventHandler> eventListeners = new 
ConcurrentLinkedQueue<>();
+    private final long slowEventsLogTimeoutNanos = 
DatabaseDescriptor.getStreamingSlowEventsLogTimeout().toNanoseconds();
 
     /**
      * Create new StreamResult of given {@code planId} and streamOperation.
@@ -175,7 +181,7 @@ public final class StreamResultFuture extends 
AsyncFuture<StreamState>
         return planId.hashCode();
     }
 
-    void handleSessionPrepared(StreamSession session)
+    void handleSessionPrepared(StreamSession session, 
StreamSession.PrepareDirection prepareDirection)
     {
         SessionInfo sessionInfo = session.getSessionInfo();
         logger.info("[Stream #{} ID#{}] Prepare completed. Receiving {} 
files({}), sending {} files({})",
@@ -185,7 +191,7 @@ public final class StreamResultFuture extends 
AsyncFuture<StreamState>
                               
FBUtilities.prettyPrintMemory(sessionInfo.getTotalSizeToReceive()),
                               sessionInfo.getTotalFilesToSend(),
                               
FBUtilities.prettyPrintMemory(sessionInfo.getTotalSizeToSend()));
-        StreamEvent.SessionPreparedEvent event = new 
StreamEvent.SessionPreparedEvent(planId, sessionInfo);
+        StreamEvent.SessionPreparedEvent event = new 
StreamEvent.SessionPreparedEvent(planId, sessionInfo, prepareDirection);
         coordinator.addSessionInfo(sessionInfo);
         fireStreamEvent(event);
     }
@@ -208,6 +214,7 @@ public final class StreamResultFuture extends 
AsyncFuture<StreamState>
     synchronized void fireStreamEvent(StreamEvent event)
     {
         // delegate to listener
+        long startNanos = nanoTime();
         for (StreamEventHandler listener : eventListeners)
         {
             try
@@ -219,6 +226,10 @@ public final class StreamResultFuture extends 
AsyncFuture<StreamState>
                 logger.warn("Unexpected exception in listern while calling 
handleStreamEvent", t);
             }
         }
+        long totalNanos = nanoTime() - startNanos;
+        if (totalNanos > slowEventsLogTimeoutNanos)
+            NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, 
TimeUnit.MINUTES, "Handling streaming events took longer than {}; took {}",
+                             () -> new Object[] { 
Duration.ofNanos(slowEventsLogTimeoutNanos), Duration.ofNanos(totalNanos)});
     }
 
     private synchronized void maybeComplete()
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java 
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index a540a1b6d0..811717f85d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -148,6 +148,8 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
 {
     private static final Logger logger = 
LoggerFactory.getLogger(StreamSession.class);
 
+    public enum PrepareDirection { SEND, ACK }
+
     // for test purpose to record received message and state transition
     public volatile static MessageStateSink sink = MessageStateSink.NONE;
 
@@ -738,7 +740,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
             for (StreamTransferTask task : transfers.values())
                 prepareSynAck.summaries.add(task.getSummary());
 
-        streamResult.handleSessionPrepared(this);
+        streamResult.handleSessionPrepared(this, PrepareDirection.SEND);
         // After sending the message the initiator can close the channel which 
will cause a ClosedChannelException
         // in buffer logic, this then gets sent to onError which validates the 
state isFinalState, if not fails
         // the session.  To avoid a race condition between sending and setting 
state, make sure to update the state
@@ -769,14 +771,14 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         if (isPreview())
             completePreview();
         else
-            startStreamingFiles(true);
+            startStreamingFiles(PrepareDirection.ACK);
     }
 
     private void prepareAck(PrepareAckMessage msg)
     {
         if (isPreview())
             throw new RuntimeException(String.format("[Stream #%s] Cannot 
receive PrepareAckMessage for preview session", planId()));
-        startStreamingFiles(true);
+        startStreamingFiles(PrepareDirection.ACK);
     }
 
     /**
@@ -845,9 +847,13 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         }
     }
 
-    public void progress(String filename, ProgressInfo.Direction direction, 
long bytes, long total)
+    public void progress(String filename, ProgressInfo.Direction direction, 
long bytes, long delta, long total)
     {
-        ProgressInfo progress = new ProgressInfo(peer, index, filename, 
direction, bytes, total);
+        if (delta < 0)
+            NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, 
TimeUnit.MINUTES,
+                             "[id={}, key={{}, {}, {})] Stream event reported 
a negative delta ({})",
+                             planId(), peer, filename, direction, delta);
+        ProgressInfo progress = new ProgressInfo(peer, index, filename, 
direction, bytes, delta, total);
         streamResult.handleProgress(progress);
     }
 
@@ -1008,10 +1014,10 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
             receivers.put(summary.tableId, new StreamReceiveTask(this, 
summary.tableId, summary.files, summary.totalSize));
     }
 
-    private void startStreamingFiles(boolean notifyPrepared)
+    private void startStreamingFiles(@Nullable PrepareDirection 
prepareDirection)
     {
-        if (notifyPrepared)
-            streamResult.handleSessionPrepared(this);
+        if (prepareDirection != null)
+            streamResult.handleSessionPrepared(this, prepareDirection);
 
         state(State.STREAMING);
 
diff --git a/src/java/org/apache/cassandra/streaming/StreamingState.java 
b/src/java/org/apache/cassandra/streaming/StreamingState.java
index 23685bd0ef..c2eed1ea9e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingState.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingState.java
@@ -20,16 +20,16 @@ package org.apache.cassandra.streaming;
 import java.math.BigDecimal;
 import java.math.RoundingMode;
 import java.net.InetSocketAddress;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,18 +54,12 @@ public class StreamingState implements StreamEventHandler
 
     private final long createdAtMillis = Clock.Global.currentTimeMillis();
 
-    // while streaming is running, this is a cache of StreamInfo seen with 
progress state
-    // the reason for the cache is that StreamSession drops data after tasks 
(recieve/send) complete, this makes
-    // it so that current state of a future tracks work pending rather than 
work done, cache solves this by not deleting
-    // when tasks complete
-    // To lower memory costs, clear this after the stream completes
-    private ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress = new 
ConcurrentHashMap<>();
-
     private final TimeUUID id;
     private final boolean follower;
     private final StreamOperation operation;
-    private Set<InetSocketAddress> peers = null;
-    private Sessions sessions = Sessions.EMPTY;
+    private final Set<InetSocketAddress> peers = Collections.newSetFromMap(new 
ConcurrentHashMap<>());
+    @GuardedBy("this")
+    private final Sessions sessions = new Sessions();
 
     private Status status;
     private String completeMessage = null;
@@ -107,13 +101,7 @@ public class StreamingState implements StreamEventHandler
 
     public Set<InetSocketAddress> peers()
     {
-        Set<InetSocketAddress> peers = this.peers;
-        if (peers != null)
-            return peers;
-        ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress = 
this.streamProgress;
-        if (streamProgress != null)
-            return streamProgress.keySet();
-        return Collections.emptySet();
+        return this.peers;
     }
 
     public Status status()
@@ -138,6 +126,7 @@ public class StreamingState implements StreamEventHandler
         }
     }
 
+    @VisibleForTesting
     public StreamResultFuture future()
     {
         if (follower)
@@ -225,12 +214,6 @@ public class StreamingState implements StreamEventHandler
     @Override
     public synchronized void handleStreamEvent(StreamEvent event)
     {
-        ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress = 
this.streamProgress;
-        if (streamProgress == null)
-        {
-            logger.warn("Got stream event {} after the stream completed", 
event.eventType);
-            return;
-        }
         try
         {
             switch (event.eventType)
@@ -252,52 +235,52 @@ public class StreamingState implements StreamEventHandler
         {
             logger.warn("Unexpected exception handling stream event", t);
         }
-        sessions = Sessions.create(streamProgress.values());
         lastUpdatedAtNanos = Clock.Global.nanoTime();
     }
 
     private void streamPrepared(StreamEvent.SessionPreparedEvent event)
     {
-        SessionInfo session = new SessionInfo(event.session);
-        streamProgress.putIfAbsent(session.peer, session);
+        SessionInfo session = event.session;
+        peers.add(session.peer);
+        // only update stats on ACK to avoid duplication
+        if (event.prepareDirection != StreamSession.PrepareDirection.ACK)
+            return;
+        sessions.bytesToReceive += session.getTotalSizeToReceive();
+        sessions.bytesToSend += session.getTotalSizeToSend();
+
+        sessions.filesToReceive += session.getTotalFilesToReceive();
+        sessions.filesToSend += session.getTotalFilesToSend();
     }
 
     private void streamProgress(StreamEvent.ProgressEvent event)
     {
-        SessionInfo info = streamProgress.get(event.progress.peer);
-        if (info != null)
+        ProgressInfo info = event.progress;
+
+        if (info.direction == ProgressInfo.Direction.IN)
         {
-            info.updateProgress(event.progress);
+            // receiving
+            sessions.bytesReceived += info.deltaBytes;
+            if (info.isCompleted())
+                sessions.filesReceived++;
         }
         else
         {
-            logger.warn("[Stream #{}} ID#{}] Recieved stream progress before 
prepare; peer={}", id, event.progress.sessionIndex, event.progress.peer);
+            // sending
+            sessions.bytesSent += info.deltaBytes;
+            if (info.isCompleted())
+                sessions.filesSent++;
         }
     }
 
     @Override
     public synchronized void onSuccess(@Nullable StreamState state)
     {
-        ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress = 
this.streamProgress;
-        if (streamProgress != null)
-        {
-            sessions = Sessions.create(streamProgress.values());
-            peers = new HashSet<>(streamProgress.keySet());
-            this.streamProgress = null;
-            updateState(Status.SUCCESS);
-        }
+        updateState(Status.SUCCESS);
     }
 
     @Override
     public synchronized void onFailure(Throwable throwable)
     {
-        ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress = 
this.streamProgress;
-        if (streamProgress != null)
-        {
-            sessions = Sessions.create(streamProgress.values());
-            peers = new HashSet<>(streamProgress.keySet());
-            this.streamProgress = null;
-        }
         completeMessage = Throwables.getStackTraceAsString(throwable);
         updateState(Status.FAILURE);
     }
@@ -326,24 +309,10 @@ public class StreamingState implements StreamEventHandler
 
     public static class Sessions
     {
-        public static final Sessions EMPTY = new Sessions(0, 0, 0, 0, 0, 0, 0, 
0);
-
-        public final long bytesToReceive, bytesReceived;
-        public final long bytesToSend, bytesSent;
-        public final long filesToReceive, filesReceived;
-        public final long filesToSend, filesSent;
-
-        public Sessions(long bytesToReceive, long bytesReceived, long 
bytesToSend, long bytesSent, long filesToReceive, long filesReceived, long 
filesToSend, long filesSent)
-        {
-            this.bytesToReceive = bytesToReceive;
-            this.bytesReceived = bytesReceived;
-            this.bytesToSend = bytesToSend;
-            this.bytesSent = bytesSent;
-            this.filesToReceive = filesToReceive;
-            this.filesReceived = filesReceived;
-            this.filesToSend = filesToSend;
-            this.filesSent = filesSent;
-        }
+        public long bytesToReceive, bytesReceived;
+        public long bytesToSend, bytesSent;
+        public long filesToReceive, filesReceived;
+        public long filesToSend, filesSent;
 
         public static String columns()
         {
@@ -357,51 +326,9 @@ public class StreamingState implements StreamEventHandler
                    "  files_sent bigint, \n";
         }
 
-        public static Sessions create(Collection<SessionInfo> sessions)
-        {
-            long bytesToReceive = 0;
-            long bytesReceived = 0;
-            long filesToReceive = 0;
-            long filesReceived = 0;
-            long bytesToSend = 0;
-            long bytesSent = 0;
-            long filesToSend = 0;
-            long filesSent = 0;
-            for (SessionInfo session : sessions)
-            {
-                bytesToReceive += session.getTotalSizeToReceive();
-                bytesReceived += session.getTotalSizeReceived();
-
-                filesToReceive += session.getTotalFilesToReceive();
-                filesReceived += session.getTotalFilesReceived();
-
-                bytesToSend += session.getTotalSizeToSend();
-                bytesSent += session.getTotalSizeSent();
-
-                filesToSend += session.getTotalFilesToSend();
-                filesSent += session.getTotalFilesSent();
-            }
-            if (0 == bytesToReceive && 0 == bytesReceived && 0 == 
filesToReceive && 0 == filesReceived && 0 == bytesToSend && 0 == bytesSent && 0 
== filesToSend && 0 == filesSent)
-                return EMPTY;
-            return new Sessions(bytesToReceive, bytesReceived,
-                                bytesToSend, bytesSent,
-                                filesToReceive, filesReceived,
-                                filesToSend, filesSent);
-        }
-
         public boolean isEmpty()
         {
-            return this == EMPTY;
-        }
-
-        public BigDecimal receivedBytesPercent()
-        {
-            return div(bytesReceived, bytesToReceive);
-        }
-
-        public BigDecimal sentBytesPercent()
-        {
-            return div(bytesSent, bytesToSend);
+            return bytesToReceive == 0 && bytesToSend == 0 && filesToReceive 
== 0 && filesToSend == 0;
         }
 
         public BigDecimal progress()
diff --git 
a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
 
b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
index a3eb7d1f34..72ab84407a 100644
--- 
a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
+++ 
b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
@@ -74,6 +74,8 @@ public class ProgressInfoCompositeData
 
     public static CompositeData toCompositeData(TimeUUID planId, ProgressInfo 
progressInfo)
     {
+        // Delta is not returned as it wasn't clear the impact to backwards 
compatability; it may be safe to expose.
+        // see CASSANDRA-18110
         Map<String, Object> valueMap = new HashMap<>();
         valueMap.put(ITEM_NAMES[0], planId.toString());
         valueMap.put(ITEM_NAMES[1], 
progressInfo.peer.getAddress().getHostAddress());
@@ -103,6 +105,7 @@ public class ProgressInfoCompositeData
                                     (String) values[4],
                                     
ProgressInfo.Direction.valueOf((String)values[5]),
                                     (long) values[6],
+                                    (long) values[6],
                                     (long) values[7]);
         }
         catch (UnknownHostException e)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java
index db896697fb..ee41ec42d7 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.distributed.test.streaming;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collections;
 
 import org.junit.Test;
@@ -29,28 +30,49 @@ import org.apache.cassandra.distributed.api.Row;
 import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
 import org.apache.cassandra.distributed.util.QueryResultUtil;
+import org.assertj.core.api.Assertions;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 public class RebuildStreamingTest extends TestBaseImpl
 {
+    private static final ByteBuffer BLOB = ByteBuffer.wrap(new byte[1 << 16]);
+    // zero copy streaming sends all components, so the events will include 
non-Data files as well
+    private static final int NUM_COMPONENTS = 7;
+
+    @Test
+    public void zeroCopy() throws IOException
+    {
+        test(true);
+    }
+
     @Test
-    public void test() throws IOException
+    public void notZeroCopy() throws IOException
+    {
+        test(false);
+    }
+
+    private void test(boolean zeroCopyStreaming) throws IOException
     {
         try (Cluster cluster = init(Cluster.build(2)
-                                           .withConfig(c -> 
c.with(Feature.values()).set("stream_entire_sstables", false))
+                                           .withConfig(c -> 
c.with(Feature.values())
+                                                             
.set("stream_entire_sstables", 
zeroCopyStreaming).set("streaming_slow_events_log_timeout", "0s"))
                                            .start()))
         {
-            cluster.schemaChange(withKeyspace("CREATE TABLE %s.users (user_id 
varchar PRIMARY KEY);"));
+            // streaming sends events every 65k, so need to make sure that the 
files are larger than this to hit
+            // all cases of the vtable
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.users (user_id 
varchar, spacing blob, PRIMARY KEY (user_id)) WITH compression = { 'enabled' : 
false };"));
             cluster.stream().forEach(i -> 
i.nodetoolResult("disableautocompaction", KEYSPACE).asserts().success());
             IInvokableInstance first = cluster.get(1);
             IInvokableInstance second = cluster.get(2);
             long expectedFiles = 10;
             for (int i = 0; i < expectedFiles; i++)
             {
-                first.executeInternal(withKeyspace("insert into 
%s.users(user_id) values (?)"), "dcapwell" + i);
+                first.executeInternal(withKeyspace("insert into 
%s.users(user_id, spacing) values (?, ? )"), "dcapwell" + i, BLOB);
                 first.flush(KEYSPACE);
             }
+            if (zeroCopyStreaming) // will include all components so need to 
account for
+                expectedFiles *= NUM_COMPONENTS;
 
             second.nodetoolResult("rebuild", "--keyspace", 
KEYSPACE).asserts().success();
 
@@ -91,6 +113,9 @@ public class RebuildStreamingTest extends TestBaseImpl
                            .columnsEqualTo("files_to_receive", 
"files_received").isEqualTo("files_received", expectedFiles)
                            .columnsEqualTo("bytes_to_receive", 
"bytes_received").isEqualTo("bytes_received", totalBytes)
                            .columnsEqualTo("files_sent", "files_to_send", 
"bytes_sent", "bytes_to_send").isEqualTo("files_sent", 0L);
+
+            // did we trigger slow event log?
+            cluster.forEach(i -> Assertions.assertThat(i.logs().grep("Handling 
streaming events took longer than").getResult()).describedAs("Unable to find 
slow log for node%d", i.config().num()).isNotEmpty());
         }
     }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamingStatsDisabledTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamingStatsDisabledTest.java
new file mode 100644
index 0000000000..f3d4394d21
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamingStatsDisabledTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.util.QueryResultUtil;
+import org.apache.cassandra.streaming.StreamManager;
+
+public class StreamingStatsDisabledTest extends TestBaseImpl
+{
+    @Test
+    public void test() throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(2)
+                                           .withConfig(c -> 
c.with(Feature.values()).set("streaming_stats_enabled", false))
+                                           .start()))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.users (user_id 
varchar, PRIMARY KEY (user_id));"));
+            cluster.stream().forEach(i -> 
i.nodetoolResult("disableautocompaction", KEYSPACE).asserts().success());
+
+            long expectedFiles = 10;
+            for (int i = 0; i < expectedFiles; i++)
+            {
+                cluster.get(1).executeInternal(withKeyspace("insert into 
%s.users(user_id) values (?)"), "dcapwell" + i);
+                cluster.get(1).flush(KEYSPACE);
+            }
+
+            cluster.get(2).nodetoolResult("rebuild", "--keyspace", 
KEYSPACE).asserts().success();
+            for (int nodeId : Arrays.asList(1, 2))
+                
QueryResultUtil.assertThat(cluster.get(nodeId).executeInternalWithResult("SELECT
 * FROM system_views.streaming")).isEmpty();
+
+            // trigger streaming again
+            cluster.get(1).executeInternal(withKeyspace("INSERT INTO 
%s.users(user_id) VALUES ('trigger streaming')"));
+            // mimic JMX
+            cluster.get(2).runOnInstance(() -> 
StreamManager.instance.setStreamingStatsEnabled(true));
+            cluster.get(2).nodetoolResult("repair", 
KEYSPACE).asserts().success();
+
+            
QueryResultUtil.assertThat(cluster.get(1).executeInternalWithResult("SELECT * 
FROM system_views.streaming")).isEmpty();
+            
QueryResultUtil.assertThat(cluster.get(2).executeInternalWithResult("SELECT * 
FROM system_views.streaming")).hasSize(1);
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java 
b/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java
index 58842bc691..a502e8c1e6 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
 import java.util.Objects;
 import java.util.function.Predicate;
 
+import com.google.monitoring.runtime.instrumentation.common.collect.Iterators;
 import org.apache.cassandra.distributed.api.Row;
 import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.tools.nodetool.formatter.TableBuilder;
@@ -176,5 +177,11 @@ public class QueryResultUtil
             
Assertions.assertThat(qr.toObjectArrays()).hasSizeGreaterThan(size);
             return this;
         }
+
+        public void isEmpty()
+        {
+            int size = Iterators.size(qr);
+            Assertions.assertThat(size).describedAs("QueryResult is not 
empty").isEqualTo(0);
+        }
     }
 }
diff --git 
a/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java 
b/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java
index cce995688e..c8e3d89f6b 100644
--- a/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java
@@ -18,9 +18,13 @@
 package org.apache.cassandra.db.virtual;
 
 import java.net.UnknownHostException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Date;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -34,9 +38,11 @@ import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.ProgressInfo.Direction;
 import org.apache.cassandra.streaming.SessionInfo;
 import org.apache.cassandra.streaming.StreamCoordinator;
 import org.apache.cassandra.streaming.StreamEvent;
+import org.apache.cassandra.streaming.StreamEvent.ProgressEvent;
 import org.apache.cassandra.streaming.StreamManager;
 import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.streaming.StreamResultFuture;
@@ -90,7 +96,7 @@ public class StreamingVirtualTableTest extends CQLTester
         assertRows(execute(t("select id, follower, operation, peers, status, 
progress_percentage, last_updated_at, failure_cause, success_message from %s")),
                    new Object[] { state.id(), true, "Repair", 
Collections.emptyList(), "start", 0F, new Date(state.lastUpdatedAtMillis()), 
null, null });
 
-        state.handleStreamEvent(new 
StreamEvent.SessionPreparedEvent(state.id(), new SessionInfo(PEER2, 1, PEER1, 
Collections.emptyList(), Collections.emptyList(), 
StreamSession.State.PREPARING)));
+        state.handleStreamEvent(new 
StreamEvent.SessionPreparedEvent(state.id(), new SessionInfo(PEER2, 1, PEER1, 
Collections.emptyList(), Collections.emptyList(), 
StreamSession.State.PREPARING), StreamSession.PrepareDirection.ACK));
 
         state.onSuccess(new StreamState(state.id(), StreamOperation.REPAIR, 
ImmutableSet.of(new SessionInfo(PEER2, 1, PEER1, Collections.emptyList(), 
Collections.emptyList(), StreamSession.State.COMPLETE))));
         assertRows(execute(t("select id, follower, operation, peers, status, 
progress_percentage, last_updated_at, failure_cause, success_message from %s")),
@@ -118,8 +124,9 @@ public class StreamingVirtualTableTest extends CQLTester
         SessionInfo s1 = new SessionInfo(PEER2, 0, 
FBUtilities.getBroadcastAddressAndPort(), Arrays.asList(streamSummary()), 
Arrays.asList(streamSummary(), streamSummary()), StreamSession.State.PREPARING);
         SessionInfo s2 = new SessionInfo(PEER3, 0, 
FBUtilities.getBroadcastAddressAndPort(), Arrays.asList(streamSummary()), 
Arrays.asList(streamSummary(), streamSummary()), StreamSession.State.PREPARING);
 
-        state.handleStreamEvent(new 
StreamEvent.SessionPreparedEvent(state.id(), s1));
-        state.handleStreamEvent(new 
StreamEvent.SessionPreparedEvent(state.id(), s2));
+        // we only update stats on ACK
+        state.handleStreamEvent(new 
StreamEvent.SessionPreparedEvent(state.id(), s1, 
StreamSession.PrepareDirection.ACK));
+        state.handleStreamEvent(new 
StreamEvent.SessionPreparedEvent(state.id(), s2, 
StreamSession.PrepareDirection.ACK));
 
         long bytesToReceive = 0, bytesToSend = 0;
         long filesToReceive = 0, filesToSend = 0;
@@ -133,31 +140,34 @@ public class StreamingVirtualTableTest extends CQLTester
         assertRows(execute(t("select id, follower, peers, status, 
progress_percentage, bytes_to_receive, bytes_received, bytes_to_send, 
bytes_sent, files_to_receive, files_received, files_to_send, files_sent from 
%s")),
                    new Object[] { state.id(), follower, 
Arrays.asList(PEER2.toString(), PEER3.toString()), "start", 0F, bytesToReceive, 
0L, bytesToSend, 0L, filesToReceive, 0L, filesToSend, 0L });
 
-        // update progress
+        // update progress; sent all but 1 file
         long bytesReceived = 0, bytesSent = 0;
+        long filesReceived = 0, filesSent = 0;
         for (SessionInfo s : Arrays.asList(s1, s2))
         {
-            long in = s.getTotalFilesToReceive() - 1;
-            long inBytes = s.getTotalSizeToReceive() - in;
-            long out = s.getTotalFilesToSend() - 1;
-            long outBytes = s.getTotalSizeToSend() - out;
-            state.handleStreamEvent(new StreamEvent.ProgressEvent(state.id(), 
new ProgressInfo((InetAddressAndPort) s.peer, 0, "0", 
ProgressInfo.Direction.IN, inBytes, inBytes)));
-            state.handleStreamEvent(new StreamEvent.ProgressEvent(state.id(), 
new ProgressInfo((InetAddressAndPort) s.peer, 0, "0", 
ProgressInfo.Direction.OUT, outBytes, outBytes)));
-            bytesReceived += inBytes;
-            bytesSent += outBytes;
+            List<StreamSummary> receiving = 
deterministic(s.receivingSummaries);
+            bytesReceived += progressEvent(state, s, receiving, Direction.IN);
+            filesReceived += receiving.stream().mapToInt(ss -> ss.files - 
1).sum();
+
+            List<StreamSummary> sending = deterministic(s.sendingSummaries);
+            bytesSent += progressEvent(state, s, sending, Direction.OUT);
+            filesSent += sending.stream().mapToInt(ss -> ss.files - 1).sum();
         }
 
         assertRows(execute(t("select id, follower, peers, status, 
bytes_to_receive, bytes_received, bytes_to_send, bytes_sent, files_to_receive, 
files_received, files_to_send, files_sent from %s")),
-                   new Object[] { state.id(), follower, 
Arrays.asList(PEER2.toString(), PEER3.toString()), "start", bytesToReceive, 
bytesReceived, bytesToSend, bytesSent, filesToReceive, 2L, filesToSend, 2L });
+                   new Object[] { state.id(), follower, 
Arrays.asList(PEER2.toString(), PEER3.toString()), "start", bytesToReceive, 
bytesReceived, bytesToSend, bytesSent, filesToReceive, filesReceived, 
filesToSend, filesSent });
 
         // finish
         for (SessionInfo s : Arrays.asList(s1, s2))
         {
             // complete the rest
-            for (long i = 1; i < s.getTotalFilesToReceive(); i++)
-                state.handleStreamEvent(new 
StreamEvent.ProgressEvent(state.id(), new ProgressInfo((InetAddressAndPort) 
s.peer, 0, Long.toString(i), ProgressInfo.Direction.IN, 1, 1)));
-            for (long i = 1; i < s.getTotalFilesToSend(); i++)
-                state.handleStreamEvent(new 
StreamEvent.ProgressEvent(state.id(), new ProgressInfo((InetAddressAndPort) 
s.peer, 0, Long.toString(i), ProgressInfo.Direction.OUT, 1, 1)));
+            List<StreamSummary> receiving = 
deterministic(s.receivingSummaries);
+            bytesReceived += completeEvent(state, s, receiving, Direction.IN);
+            filesReceived += receiving.stream().mapToInt(ss -> ss.files - 
1).sum();
+
+            List<StreamSummary> sending = deterministic(s.sendingSummaries);
+            bytesSent += completeEvent(state, s, sending, Direction.OUT);
+            filesSent += sending.stream().mapToInt(ss -> ss.files - 1).sum();
         }
 
         assertRows(execute(t("select id, follower, peers, status, 
progress_percentage, bytes_to_receive, bytes_received, bytes_to_send, 
bytes_sent, files_to_receive, files_received, files_to_send, files_sent from 
%s")),
@@ -168,6 +178,47 @@ public class StreamingVirtualTableTest extends CQLTester
                    new Object[] { state.id(), follower, 
Arrays.asList(PEER2.toString(), PEER3.toString()), "success", 100F, new 
Date(state.lastUpdatedAtMillis()), null, null });
     }
 
+    private static long progressEvent(StreamingState state, SessionInfo s, 
List<StreamSummary> summaries, Direction direction)
+    {
+        long counter = 0;
+        for (StreamSummary summary : summaries)
+        {
+            long fileSize = summary.totalSize / summary.files;
+            for (int i = 0; i < summary.files - 1; i++)
+            {
+                String fileName = summary.tableId + "-" + 
direction.name().toLowerCase() + "-" + i;
+                state.handleStreamEvent(new ProgressEvent(state.id(), new 
ProgressInfo((InetAddressAndPort) s.peer, 0, fileName, direction, fileSize, 
fileSize, fileSize)));
+                counter += fileSize;
+            }
+        }
+        return counter;
+    }
+
+    private static long completeEvent(StreamingState state, SessionInfo s, 
List<StreamSummary> summaries, Direction direction)
+    {
+        long counter = 0;
+        for (StreamSummary summary : summaries)
+        {
+            long fileSize = summary.totalSize / summary.files;
+            String fileName = summary.tableId + "-" + 
direction.name().toLowerCase() + "-" + summary.files;
+            state.handleStreamEvent(new ProgressEvent(state.id(), new 
ProgressInfo((InetAddressAndPort) s.peer, 0, fileName, direction, fileSize, 
fileSize, fileSize)));
+            counter += fileSize;
+        }
+        return counter;
+    }
+
+    private List<StreamSummary> deterministic(Collection<StreamSummary> 
summaries)
+    {
+        // SessionInfo uses a ImmutableSet... so create a list
+        List<StreamSummary> list = new ArrayList<>(summaries);
+        // need to order so all calls with the same input return the same order
+        // if duplicates are found, the object order may be different but the 
contents will match
+        Collections.sort(list, Comparator.comparing((StreamSummary a) -> 
a.tableId.asUUID())
+                                         .thenComparingInt(a -> a.files)
+                                         .thenComparingLong(a -> a.totalSize));
+        return list;
+    }
+
     private static StreamSummary streamSummary()
     {
         int files = ThreadLocalRandom.current().nextInt(2, 10);
diff --git a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java 
b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
index 4f0c49468d..45172fe14c 100644
--- a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
+++ b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
@@ -57,13 +57,13 @@ public class SessionInfoTest
         assert info.getTotalFilesSent() == 0;
 
         // receive in progress
-        info.updateProgress(new ProgressInfo(local, 0, "test.txt", 
ProgressInfo.Direction.IN, 50, 100));
+        info.updateProgress(new ProgressInfo(local, 0, "test.txt", 
ProgressInfo.Direction.IN, 50, 50, 100));
         // still in progress, but not completed yet
         assert info.getTotalSizeReceived() == 50;
         assert info.getTotalSizeSent() == 0;
         assert info.getTotalFilesReceived() == 0;
         assert info.getTotalFilesSent() == 0;
-        info.updateProgress(new ProgressInfo(local, 0, "test.txt", 
ProgressInfo.Direction.IN, 100, 100));
+        info.updateProgress(new ProgressInfo(local, 0, "test.txt", 
ProgressInfo.Direction.IN, 100, 100, 100));
         // 1 file should be completed
         assert info.getTotalSizeReceived() == 100;
         assert info.getTotalSizeSent() == 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to