This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch cassandra-4.1 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.1 by this push: new 5be1038c5d Streaming progress virtual table lock contention can trigger TCP_USER_TIMEOUT and fail streaming 5be1038c5d is described below commit 5be1038c5d38af32d3cbb0545d867f21304f3a46 Author: David Capwell <dcapw...@apache.org> AuthorDate: Wed Jan 11 13:40:57 2023 -0800 Streaming progress virtual table lock contention can trigger TCP_USER_TIMEOUT and fail streaming patch by David Capwell; reviewed by Abe Ratnofsky, Jon Meredith for CASSANDRA-18110 --- CHANGES.txt | 3 +- conf/cassandra.yaml | 10 ++ src/java/org/apache/cassandra/config/Config.java | 3 + .../cassandra/config/DatabaseDescriptor.java | 27 ++++ .../org/apache/cassandra/config/DurationSpec.java | 10 ++ .../streaming/CassandraCompressedStreamReader.java | 7 +- .../streaming/CassandraCompressedStreamWriter.java | 3 +- .../CassandraEntireSSTableStreamReader.java | 2 +- .../CassandraEntireSSTableStreamWriter.java | 2 +- .../db/streaming/CassandraStreamReader.java | 7 +- .../db/streaming/CassandraStreamWriter.java | 6 +- .../apache/cassandra/streaming/ProgressInfo.java | 5 +- .../apache/cassandra/streaming/StreamEvent.java | 4 +- .../apache/cassandra/streaming/StreamManager.java | 26 ++++ .../cassandra/streaming/StreamManagerMBean.java | 20 +++ .../cassandra/streaming/StreamResultFuture.java | 15 ++- .../apache/cassandra/streaming/StreamSession.java | 22 ++-- .../apache/cassandra/streaming/StreamingState.java | 143 +++++---------------- .../management/ProgressInfoCompositeData.java | 3 + .../test/streaming/RebuildStreamingTest.java | 33 ++++- .../test/streaming/StreamingStatsDisabledTest.java | 65 ++++++++++ .../distributed/util/QueryResultUtil.java | 7 + .../db/virtual/StreamingVirtualTableTest.java | 85 +++++++++--- .../cassandra/streaming/SessionInfoTest.java | 4 +- 24 files changed, 361 insertions(+), 151 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index c6ceac0575..c5d192a9c8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 4.1.1 -* Fix perpetual load of denylist on read in cases where denylist can never be loaded (CASSANDRA-18116) + * Streaming progress virtual table lock contention can trigger TCP_USER_TIMEOUT and fail streaming (CASSANDRA-18110) + * Fix perpetual load of denylist on read in cases where denylist can never be loaded (CASSANDRA-18116) Merged from 4.0: * Avoid ConcurrentModificationException in STCS/DTCS/TWCS.getSSTables (CASSANDRA-17977) * Restore internode custom tracing on 4.0's new messaging system (CASSANDRA-17981) diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index ef7cc72605..f54c0d4617 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -1161,6 +1161,16 @@ slow_query_log_timeout: 500ms # bound (for example a few nodes with big files). # streaming_connections_per_host: 1 +# Settings for stream stats tracking; used by system_views.streaming table +# How long before a stream is evicted from tracking; this impacts both historic and currently running +# streams. +# streaming_state_expires: 3d +# How much memory may be used for tracking before evicting session from tracking; once crossed +# historic and currently running streams maybe impacted. +# streaming_state_size: 40MiB +# Enable/Disable tracking of streaming stats +# streaming_stats_enabled: true + # Allows denying configurable access (rw/rr) to operations on configured ks, table, and partitions, intended for use by # operators to manage cluster health vs application access. See CASSANDRA-12106 and CEP-13 for more details. # partition_denylist_enabled: false diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index df419e780c..8a59ca2cda 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -852,6 +852,9 @@ public class Config public volatile DurationSpec.LongNanosecondsBound streaming_state_expires = new DurationSpec.LongNanosecondsBound("3d"); public volatile DataStorageSpec.LongBytesBound streaming_state_size = new DataStorageSpec.LongBytesBound("40MiB"); + public volatile boolean streaming_stats_enabled = true; + public volatile DurationSpec.IntSecondsBound streaming_slow_events_log_timeout = new DurationSpec.IntSecondsBound("10s"); + /** The configuration of startup checks. */ public volatile Map<StartupCheckType, Map<String, Object>> startup_checks = new HashMap<>(); diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 62d1acdada..d2c529c3d4 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -4294,6 +4294,33 @@ public class DatabaseDescriptor } } + public static boolean getStreamingStatsEnabled() + { + return conf.streaming_stats_enabled; + } + + public static void setStreamingStatsEnabled(boolean streamingStatsEnabled) + { + if (conf.streaming_stats_enabled != streamingStatsEnabled) + { + logger.info("Setting streaming_stats_enabled to {}", streamingStatsEnabled); + conf.streaming_stats_enabled = streamingStatsEnabled; + } + } + + public static DurationSpec.IntSecondsBound getStreamingSlowEventsLogTimeout() { + return conf.streaming_slow_events_log_timeout; + } + + public static void setStreamingSlowEventsLogTimeout(String value) { + DurationSpec.IntSecondsBound next = new DurationSpec.IntSecondsBound(value); + if (!conf.streaming_slow_events_log_timeout.equals(next)) + { + logger.info("Setting streaming_slow_events_log to " + value); + conf.streaming_slow_events_log_timeout = next; + } + } + public static boolean isUUIDSSTableIdentifiersEnabled() { return conf.uuid_sstable_identifiers_enabled; diff --git a/src/java/org/apache/cassandra/config/DurationSpec.java b/src/java/org/apache/cassandra/config/DurationSpec.java index ba7e9f8415..10d56c23eb 100644 --- a/src/java/org/apache/cassandra/config/DurationSpec.java +++ b/src/java/org/apache/cassandra/config/DurationSpec.java @@ -520,6 +520,16 @@ public abstract class DurationSpec return new IntSecondsBound(value); } + /** + * Returns this duration in the number of nanoseconds as an {@code int} + * + * @return this duration in number of nanoseconds or {@code Integer.MAX_VALUE} if the number of nanoseconds is too large. + */ + public int toNanoseconds() + { + return Ints.saturatedCast(unit().toNanos(quantity())); + } + /** * Returns this duration in number of milliseconds as an {@code int} * diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java index dda874ba40..005a9aaa6c 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java @@ -78,6 +78,7 @@ public class CassandraCompressedStreamReader extends CassandraStreamReader deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata())); writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, format); String filename = writer.getFilename(); + String sectionName = filename + '-' + fileSeqNum; int sectionIdx = 0; for (SSTableReader.PartitionPositionBounds section : sections) { @@ -89,11 +90,15 @@ public class CassandraCompressedStreamReader extends CassandraStreamReader cis.position(section.lowerPosition); in.reset(0); + long lastBytesRead = 0; while (in.getBytesRead() < sectionLength) { writePartition(deserializer, writer); // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred - session.progress(filename + '-' + fileSeqNum, ProgressInfo.Direction.IN, cis.chunkBytesRead(), totalSize); + long bytesRead = cis.chunkBytesRead(); + long bytesDelta = bytesRead - lastBytesRead; + lastBytesRead = bytesRead; + session.progress(sectionName, ProgressInfo.Direction.IN, bytesRead, bytesDelta, totalSize); } assert in.getBytesRead() == sectionLength; } diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java index 99908f261b..41fd9b1651 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java @@ -71,6 +71,7 @@ public class CassandraCompressedStreamWriter extends CassandraStreamWriter int sectionIdx = 0; // stream each of the required sections of the file + String filename = sstable.descriptor.filenameFor(Component.DATA); for (Section section : sections) { // length of the section to stream @@ -94,7 +95,7 @@ public class CassandraCompressedStreamWriter extends CassandraStreamWriter bytesTransferred += toTransfer; progress += toTransfer; - session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize); + session.progress(filename, ProgressInfo.Direction.OUT, progress, toTransfer, totalSize); } } logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}", diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java index 261c59ef63..515c85dea6 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java @@ -122,7 +122,7 @@ public class CassandraEntireSSTableStreamReader implements IStreamReader prettyPrintMemory(totalSize)); writer.writeComponent(component.type, in, length); - session.progress(writer.descriptor.filenameFor(component), ProgressInfo.Direction.IN, length, length); + session.progress(writer.descriptor.filenameFor(component), ProgressInfo.Direction.IN, length, length, length); bytesRead += length; logger.debug("[Stream #{}] Finished receiving {} component from {}, componentSize = {}, readBytes = {}, totalSize = {}", diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java index 68546cefe6..3d679a515e 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java @@ -93,7 +93,7 @@ public class CassandraEntireSSTableStreamWriter long bytesWritten = out.writeFileToChannel(channel, limiter); progress += bytesWritten; - session.progress(sstable.descriptor.filenameFor(component), ProgressInfo.Direction.OUT, bytesWritten, length); + session.progress(sstable.descriptor.filenameFor(component), ProgressInfo.Direction.OUT, bytesWritten, bytesWritten, length); logger.debug("[Stream #{}] Finished streaming {}.{} gen {} component {} to {}, xfered = {}, length = {}, totalSize = {}", session.planId(), diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java index 2cccee3a0a..04268f024c 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java @@ -126,11 +126,16 @@ public class CassandraStreamReader implements IStreamReader TrackedDataInputPlus in = new TrackedDataInputPlus(streamCompressionInputStream); deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata())); writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, format); + String sequenceName = writer.getFilename() + '-' + fileSeqNum; + long lastBytesRead = 0; while (in.getBytesRead() < totalSize) { writePartition(deserializer, writer); // TODO move this to BytesReadTracker - session.progress(writer.getFilename() + '-' + fileSeqNum, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize); + long bytesRead = in.getBytesRead(); + long bytesDelta = bytesRead - lastBytesRead; + lastBytesRead = bytesRead; + session.progress(sequenceName, ProgressInfo.Direction.IN, bytesRead, bytesDelta, totalSize); } logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", session.planId(), fileSeqNum, session.peer, FBUtilities.prettyPrintMemory(in.getBytesRead()), FBUtilities.prettyPrintMemory(totalSize)); diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java index d69f3eec59..9d9ea3c1fc 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java @@ -94,6 +94,7 @@ public class CassandraStreamWriter long progress = 0L; // stream each of the required sections of the file + String filename = sstable.descriptor.filenameFor(Component.DATA); for (SSTableReader.PartitionPositionBounds section : sections) { long start = validator == null ? section.lowerPosition : validator.chunkStart(section.lowerPosition); @@ -112,8 +113,9 @@ public class CassandraStreamWriter long lastBytesRead = write(proxy, validator, out, start, transferOffset, toTransfer, bufferSize); start += lastBytesRead; bytesRead += lastBytesRead; - progress += (lastBytesRead - transferOffset); - session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize); + long delta = lastBytesRead - transferOffset; + progress += delta; + session.progress(filename, ProgressInfo.Direction.OUT, progress, delta, totalSize); transferOffset = 0; } diff --git a/src/java/org/apache/cassandra/streaming/ProgressInfo.java b/src/java/org/apache/cassandra/streaming/ProgressInfo.java index 2b306f8c1b..2ed78aca27 100644 --- a/src/java/org/apache/cassandra/streaming/ProgressInfo.java +++ b/src/java/org/apache/cassandra/streaming/ProgressInfo.java @@ -54,9 +54,11 @@ public class ProgressInfo implements Serializable public final String fileName; public final Direction direction; public final long currentBytes; + public final long deltaBytes; // change from previous ProgressInfo public final long totalBytes; - public ProgressInfo(InetAddressAndPort peer, int sessionIndex, String fileName, Direction direction, long currentBytes, long totalBytes) + public ProgressInfo(InetAddressAndPort peer, int sessionIndex, String fileName, Direction direction, + long currentBytes, long deltaBytes, long totalBytes) { assert totalBytes > 0; @@ -65,6 +67,7 @@ public class ProgressInfo implements Serializable this.fileName = fileName; this.direction = direction; this.currentBytes = currentBytes; + this.deltaBytes = deltaBytes; this.totalBytes = totalBytes; } diff --git a/src/java/org/apache/cassandra/streaming/StreamEvent.java b/src/java/org/apache/cassandra/streaming/StreamEvent.java index be7ad3dea6..ff83a191e9 100644 --- a/src/java/org/apache/cassandra/streaming/StreamEvent.java +++ b/src/java/org/apache/cassandra/streaming/StreamEvent.java @@ -87,11 +87,13 @@ public abstract class StreamEvent public static class SessionPreparedEvent extends StreamEvent { public final SessionInfo session; + public final StreamSession.PrepareDirection prepareDirection; - public SessionPreparedEvent(TimeUUID planId, SessionInfo session) + public SessionPreparedEvent(TimeUUID planId, SessionInfo session, StreamSession.PrepareDirection prepareDirection) { super(Type.STREAM_PREPARED, planId); this.session = session; + this.prepareDirection = prepareDirection; } } } diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java index 46ab422eff..408b6f4abe 100644 --- a/src/java/org/apache/cassandra/streaming/StreamManager.java +++ b/src/java/org/apache/cassandra/streaming/StreamManager.java @@ -230,6 +230,8 @@ public class StreamManager implements StreamManagerMBean @Override public void onRegister(StreamResultFuture result) { + if (!DatabaseDescriptor.getStreamingStatsEnabled()) + return; // reason for synchronized rather than states.get is to detect duplicates // streaming shouldn't be producing duplicates as that would imply a planId collision synchronized (states) @@ -312,6 +314,30 @@ public class StreamManager implements StreamManagerMBean })); } + @Override + public boolean getStreamingStatsEnabled() + { + return DatabaseDescriptor.getStreamingStatsEnabled(); + } + + @Override + public void setStreamingStatsEnabled(boolean streamingStatsEnabled) + { + DatabaseDescriptor.setStreamingStatsEnabled(streamingStatsEnabled); + } + + @Override + public String getStreamingSlowEventsLogTimeout() + { + return DatabaseDescriptor.getStreamingSlowEventsLogTimeout().toString(); + } + + @Override + public void setStreamingSlowEventsLogTimeout(String value) + { + DatabaseDescriptor.setStreamingSlowEventsLogTimeout(value); + } + public void registerInitiator(final StreamResultFuture result) { result.addEventListener(notifier); diff --git a/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java b/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java index f329596c4a..e49c059590 100644 --- a/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java +++ b/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java @@ -29,4 +29,24 @@ public interface StreamManagerMBean extends NotificationEmitter * Returns the current state of all ongoing streams. */ Set<CompositeData> getCurrentStreams(); + + /** + * @return whether the streaming virtual table should collect stats while streaming is running + */ + boolean getStreamingStatsEnabled(); + + /** + * enable/disable collection of streaming stats while streaming is running. + */ + void setStreamingStatsEnabled(boolean streamingStatsEnabled); + + /** + * @return current timeout for streaming slow events log + */ + String getStreamingSlowEventsLogTimeout(); + + /** + * Sets the timeout for the streaming slow events log + */ + void setStreamingSlowEventsLogTimeout(String value); } diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java index 22af238de6..b43203dee3 100644 --- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java +++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java @@ -17,11 +17,15 @@ */ package org.apache.cassandra.streaming; +import java.time.Duration; import java.util.Collection; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.TimeUUID; import org.apache.cassandra.utils.concurrent.AsyncFuture; import org.slf4j.Logger; @@ -31,6 +35,7 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.streaming.StreamingChannel.Factory.Global.streamingFactory; +import static org.apache.cassandra.utils.Clock.Global.nanoTime; /** * A future on the result ({@link StreamState}) of a streaming plan. @@ -53,6 +58,7 @@ public final class StreamResultFuture extends AsyncFuture<StreamState> public final StreamOperation streamOperation; private final StreamCoordinator coordinator; private final Collection<StreamEventHandler> eventListeners = new ConcurrentLinkedQueue<>(); + private final long slowEventsLogTimeoutNanos = DatabaseDescriptor.getStreamingSlowEventsLogTimeout().toNanoseconds(); /** * Create new StreamResult of given {@code planId} and streamOperation. @@ -175,7 +181,7 @@ public final class StreamResultFuture extends AsyncFuture<StreamState> return planId.hashCode(); } - void handleSessionPrepared(StreamSession session) + void handleSessionPrepared(StreamSession session, StreamSession.PrepareDirection prepareDirection) { SessionInfo sessionInfo = session.getSessionInfo(); logger.info("[Stream #{} ID#{}] Prepare completed. Receiving {} files({}), sending {} files({})", @@ -185,7 +191,7 @@ public final class StreamResultFuture extends AsyncFuture<StreamState> FBUtilities.prettyPrintMemory(sessionInfo.getTotalSizeToReceive()), sessionInfo.getTotalFilesToSend(), FBUtilities.prettyPrintMemory(sessionInfo.getTotalSizeToSend())); - StreamEvent.SessionPreparedEvent event = new StreamEvent.SessionPreparedEvent(planId, sessionInfo); + StreamEvent.SessionPreparedEvent event = new StreamEvent.SessionPreparedEvent(planId, sessionInfo, prepareDirection); coordinator.addSessionInfo(sessionInfo); fireStreamEvent(event); } @@ -208,6 +214,7 @@ public final class StreamResultFuture extends AsyncFuture<StreamState> synchronized void fireStreamEvent(StreamEvent event) { // delegate to listener + long startNanos = nanoTime(); for (StreamEventHandler listener : eventListeners) { try @@ -219,6 +226,10 @@ public final class StreamResultFuture extends AsyncFuture<StreamState> logger.warn("Unexpected exception in listern while calling handleStreamEvent", t); } } + long totalNanos = nanoTime() - startNanos; + if (totalNanos > slowEventsLogTimeoutNanos) + NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, "Handling streaming events took longer than {}; took {}", + () -> new Object[] { Duration.ofNanos(slowEventsLogTimeoutNanos), Duration.ofNanos(totalNanos)}); } private synchronized void maybeComplete() diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index a540a1b6d0..811717f85d 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -148,6 +148,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber { private static final Logger logger = LoggerFactory.getLogger(StreamSession.class); + public enum PrepareDirection { SEND, ACK } + // for test purpose to record received message and state transition public volatile static MessageStateSink sink = MessageStateSink.NONE; @@ -738,7 +740,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber for (StreamTransferTask task : transfers.values()) prepareSynAck.summaries.add(task.getSummary()); - streamResult.handleSessionPrepared(this); + streamResult.handleSessionPrepared(this, PrepareDirection.SEND); // After sending the message the initiator can close the channel which will cause a ClosedChannelException // in buffer logic, this then gets sent to onError which validates the state isFinalState, if not fails // the session. To avoid a race condition between sending and setting state, make sure to update the state @@ -769,14 +771,14 @@ public class StreamSession implements IEndpointStateChangeSubscriber if (isPreview()) completePreview(); else - startStreamingFiles(true); + startStreamingFiles(PrepareDirection.ACK); } private void prepareAck(PrepareAckMessage msg) { if (isPreview()) throw new RuntimeException(String.format("[Stream #%s] Cannot receive PrepareAckMessage for preview session", planId())); - startStreamingFiles(true); + startStreamingFiles(PrepareDirection.ACK); } /** @@ -845,9 +847,13 @@ public class StreamSession implements IEndpointStateChangeSubscriber } } - public void progress(String filename, ProgressInfo.Direction direction, long bytes, long total) + public void progress(String filename, ProgressInfo.Direction direction, long bytes, long delta, long total) { - ProgressInfo progress = new ProgressInfo(peer, index, filename, direction, bytes, total); + if (delta < 0) + NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, + "[id={}, key={{}, {}, {})] Stream event reported a negative delta ({})", + planId(), peer, filename, direction, delta); + ProgressInfo progress = new ProgressInfo(peer, index, filename, direction, bytes, delta, total); streamResult.handleProgress(progress); } @@ -1008,10 +1014,10 @@ public class StreamSession implements IEndpointStateChangeSubscriber receivers.put(summary.tableId, new StreamReceiveTask(this, summary.tableId, summary.files, summary.totalSize)); } - private void startStreamingFiles(boolean notifyPrepared) + private void startStreamingFiles(@Nullable PrepareDirection prepareDirection) { - if (notifyPrepared) - streamResult.handleSessionPrepared(this); + if (prepareDirection != null) + streamResult.handleSessionPrepared(this, prepareDirection); state(State.STREAMING); diff --git a/src/java/org/apache/cassandra/streaming/StreamingState.java b/src/java/org/apache/cassandra/streaming/StreamingState.java index 23685bd0ef..c2eed1ea9e 100644 --- a/src/java/org/apache/cassandra/streaming/StreamingState.java +++ b/src/java/org/apache/cassandra/streaming/StreamingState.java @@ -20,16 +20,16 @@ package org.apache.cassandra.streaming; import java.math.BigDecimal; import java.math.RoundingMode; import java.net.InetSocketAddress; -import java.util.Collection; import java.util.Collections; import java.util.EnumMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import javax.annotation.concurrent.GuardedBy; + +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,18 +54,12 @@ public class StreamingState implements StreamEventHandler private final long createdAtMillis = Clock.Global.currentTimeMillis(); - // while streaming is running, this is a cache of StreamInfo seen with progress state - // the reason for the cache is that StreamSession drops data after tasks (recieve/send) complete, this makes - // it so that current state of a future tracks work pending rather than work done, cache solves this by not deleting - // when tasks complete - // To lower memory costs, clear this after the stream completes - private ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress = new ConcurrentHashMap<>(); - private final TimeUUID id; private final boolean follower; private final StreamOperation operation; - private Set<InetSocketAddress> peers = null; - private Sessions sessions = Sessions.EMPTY; + private final Set<InetSocketAddress> peers = Collections.newSetFromMap(new ConcurrentHashMap<>()); + @GuardedBy("this") + private final Sessions sessions = new Sessions(); private Status status; private String completeMessage = null; @@ -107,13 +101,7 @@ public class StreamingState implements StreamEventHandler public Set<InetSocketAddress> peers() { - Set<InetSocketAddress> peers = this.peers; - if (peers != null) - return peers; - ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress = this.streamProgress; - if (streamProgress != null) - return streamProgress.keySet(); - return Collections.emptySet(); + return this.peers; } public Status status() @@ -138,6 +126,7 @@ public class StreamingState implements StreamEventHandler } } + @VisibleForTesting public StreamResultFuture future() { if (follower) @@ -225,12 +214,6 @@ public class StreamingState implements StreamEventHandler @Override public synchronized void handleStreamEvent(StreamEvent event) { - ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress = this.streamProgress; - if (streamProgress == null) - { - logger.warn("Got stream event {} after the stream completed", event.eventType); - return; - } try { switch (event.eventType) @@ -252,52 +235,52 @@ public class StreamingState implements StreamEventHandler { logger.warn("Unexpected exception handling stream event", t); } - sessions = Sessions.create(streamProgress.values()); lastUpdatedAtNanos = Clock.Global.nanoTime(); } private void streamPrepared(StreamEvent.SessionPreparedEvent event) { - SessionInfo session = new SessionInfo(event.session); - streamProgress.putIfAbsent(session.peer, session); + SessionInfo session = event.session; + peers.add(session.peer); + // only update stats on ACK to avoid duplication + if (event.prepareDirection != StreamSession.PrepareDirection.ACK) + return; + sessions.bytesToReceive += session.getTotalSizeToReceive(); + sessions.bytesToSend += session.getTotalSizeToSend(); + + sessions.filesToReceive += session.getTotalFilesToReceive(); + sessions.filesToSend += session.getTotalFilesToSend(); } private void streamProgress(StreamEvent.ProgressEvent event) { - SessionInfo info = streamProgress.get(event.progress.peer); - if (info != null) + ProgressInfo info = event.progress; + + if (info.direction == ProgressInfo.Direction.IN) { - info.updateProgress(event.progress); + // receiving + sessions.bytesReceived += info.deltaBytes; + if (info.isCompleted()) + sessions.filesReceived++; } else { - logger.warn("[Stream #{}} ID#{}] Recieved stream progress before prepare; peer={}", id, event.progress.sessionIndex, event.progress.peer); + // sending + sessions.bytesSent += info.deltaBytes; + if (info.isCompleted()) + sessions.filesSent++; } } @Override public synchronized void onSuccess(@Nullable StreamState state) { - ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress = this.streamProgress; - if (streamProgress != null) - { - sessions = Sessions.create(streamProgress.values()); - peers = new HashSet<>(streamProgress.keySet()); - this.streamProgress = null; - updateState(Status.SUCCESS); - } + updateState(Status.SUCCESS); } @Override public synchronized void onFailure(Throwable throwable) { - ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress = this.streamProgress; - if (streamProgress != null) - { - sessions = Sessions.create(streamProgress.values()); - peers = new HashSet<>(streamProgress.keySet()); - this.streamProgress = null; - } completeMessage = Throwables.getStackTraceAsString(throwable); updateState(Status.FAILURE); } @@ -326,24 +309,10 @@ public class StreamingState implements StreamEventHandler public static class Sessions { - public static final Sessions EMPTY = new Sessions(0, 0, 0, 0, 0, 0, 0, 0); - - public final long bytesToReceive, bytesReceived; - public final long bytesToSend, bytesSent; - public final long filesToReceive, filesReceived; - public final long filesToSend, filesSent; - - public Sessions(long bytesToReceive, long bytesReceived, long bytesToSend, long bytesSent, long filesToReceive, long filesReceived, long filesToSend, long filesSent) - { - this.bytesToReceive = bytesToReceive; - this.bytesReceived = bytesReceived; - this.bytesToSend = bytesToSend; - this.bytesSent = bytesSent; - this.filesToReceive = filesToReceive; - this.filesReceived = filesReceived; - this.filesToSend = filesToSend; - this.filesSent = filesSent; - } + public long bytesToReceive, bytesReceived; + public long bytesToSend, bytesSent; + public long filesToReceive, filesReceived; + public long filesToSend, filesSent; public static String columns() { @@ -357,51 +326,9 @@ public class StreamingState implements StreamEventHandler " files_sent bigint, \n"; } - public static Sessions create(Collection<SessionInfo> sessions) - { - long bytesToReceive = 0; - long bytesReceived = 0; - long filesToReceive = 0; - long filesReceived = 0; - long bytesToSend = 0; - long bytesSent = 0; - long filesToSend = 0; - long filesSent = 0; - for (SessionInfo session : sessions) - { - bytesToReceive += session.getTotalSizeToReceive(); - bytesReceived += session.getTotalSizeReceived(); - - filesToReceive += session.getTotalFilesToReceive(); - filesReceived += session.getTotalFilesReceived(); - - bytesToSend += session.getTotalSizeToSend(); - bytesSent += session.getTotalSizeSent(); - - filesToSend += session.getTotalFilesToSend(); - filesSent += session.getTotalFilesSent(); - } - if (0 == bytesToReceive && 0 == bytesReceived && 0 == filesToReceive && 0 == filesReceived && 0 == bytesToSend && 0 == bytesSent && 0 == filesToSend && 0 == filesSent) - return EMPTY; - return new Sessions(bytesToReceive, bytesReceived, - bytesToSend, bytesSent, - filesToReceive, filesReceived, - filesToSend, filesSent); - } - public boolean isEmpty() { - return this == EMPTY; - } - - public BigDecimal receivedBytesPercent() - { - return div(bytesReceived, bytesToReceive); - } - - public BigDecimal sentBytesPercent() - { - return div(bytesSent, bytesToSend); + return bytesToReceive == 0 && bytesToSend == 0 && filesToReceive == 0 && filesToSend == 0; } public BigDecimal progress() diff --git a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java index a3eb7d1f34..72ab84407a 100644 --- a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java +++ b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java @@ -74,6 +74,8 @@ public class ProgressInfoCompositeData public static CompositeData toCompositeData(TimeUUID planId, ProgressInfo progressInfo) { + // Delta is not returned as it wasn't clear the impact to backwards compatability; it may be safe to expose. + // see CASSANDRA-18110 Map<String, Object> valueMap = new HashMap<>(); valueMap.put(ITEM_NAMES[0], planId.toString()); valueMap.put(ITEM_NAMES[1], progressInfo.peer.getAddress().getHostAddress()); @@ -103,6 +105,7 @@ public class ProgressInfoCompositeData (String) values[4], ProgressInfo.Direction.valueOf((String)values[5]), (long) values[6], + (long) values[6], (long) values[7]); } catch (UnknownHostException e) diff --git a/test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java b/test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java index db896697fb..ee41ec42d7 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java @@ -18,6 +18,7 @@ package org.apache.cassandra.distributed.test.streaming; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collections; import org.junit.Test; @@ -29,28 +30,49 @@ import org.apache.cassandra.distributed.api.Row; import org.apache.cassandra.distributed.api.SimpleQueryResult; import org.apache.cassandra.distributed.test.TestBaseImpl; import org.apache.cassandra.distributed.util.QueryResultUtil; +import org.assertj.core.api.Assertions; import static org.assertj.core.api.Assertions.assertThat; public class RebuildStreamingTest extends TestBaseImpl { + private static final ByteBuffer BLOB = ByteBuffer.wrap(new byte[1 << 16]); + // zero copy streaming sends all components, so the events will include non-Data files as well + private static final int NUM_COMPONENTS = 7; + + @Test + public void zeroCopy() throws IOException + { + test(true); + } + @Test - public void test() throws IOException + public void notZeroCopy() throws IOException + { + test(false); + } + + private void test(boolean zeroCopyStreaming) throws IOException { try (Cluster cluster = init(Cluster.build(2) - .withConfig(c -> c.with(Feature.values()).set("stream_entire_sstables", false)) + .withConfig(c -> c.with(Feature.values()) + .set("stream_entire_sstables", zeroCopyStreaming).set("streaming_slow_events_log_timeout", "0s")) .start())) { - cluster.schemaChange(withKeyspace("CREATE TABLE %s.users (user_id varchar PRIMARY KEY);")); + // streaming sends events every 65k, so need to make sure that the files are larger than this to hit + // all cases of the vtable + cluster.schemaChange(withKeyspace("CREATE TABLE %s.users (user_id varchar, spacing blob, PRIMARY KEY (user_id)) WITH compression = { 'enabled' : false };")); cluster.stream().forEach(i -> i.nodetoolResult("disableautocompaction", KEYSPACE).asserts().success()); IInvokableInstance first = cluster.get(1); IInvokableInstance second = cluster.get(2); long expectedFiles = 10; for (int i = 0; i < expectedFiles; i++) { - first.executeInternal(withKeyspace("insert into %s.users(user_id) values (?)"), "dcapwell" + i); + first.executeInternal(withKeyspace("insert into %s.users(user_id, spacing) values (?, ? )"), "dcapwell" + i, BLOB); first.flush(KEYSPACE); } + if (zeroCopyStreaming) // will include all components so need to account for + expectedFiles *= NUM_COMPONENTS; second.nodetoolResult("rebuild", "--keyspace", KEYSPACE).asserts().success(); @@ -91,6 +113,9 @@ public class RebuildStreamingTest extends TestBaseImpl .columnsEqualTo("files_to_receive", "files_received").isEqualTo("files_received", expectedFiles) .columnsEqualTo("bytes_to_receive", "bytes_received").isEqualTo("bytes_received", totalBytes) .columnsEqualTo("files_sent", "files_to_send", "bytes_sent", "bytes_to_send").isEqualTo("files_sent", 0L); + + // did we trigger slow event log? + cluster.forEach(i -> Assertions.assertThat(i.logs().grep("Handling streaming events took longer than").getResult()).describedAs("Unable to find slow log for node%d", i.config().num()).isNotEmpty()); } } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamingStatsDisabledTest.java b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamingStatsDisabledTest.java new file mode 100644 index 0000000000..f3d4394d21 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamingStatsDisabledTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.streaming; + +import java.io.IOException; +import java.util.Arrays; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.distributed.util.QueryResultUtil; +import org.apache.cassandra.streaming.StreamManager; + +public class StreamingStatsDisabledTest extends TestBaseImpl +{ + @Test + public void test() throws IOException + { + try (Cluster cluster = init(Cluster.build(2) + .withConfig(c -> c.with(Feature.values()).set("streaming_stats_enabled", false)) + .start())) + { + cluster.schemaChange(withKeyspace("CREATE TABLE %s.users (user_id varchar, PRIMARY KEY (user_id));")); + cluster.stream().forEach(i -> i.nodetoolResult("disableautocompaction", KEYSPACE).asserts().success()); + + long expectedFiles = 10; + for (int i = 0; i < expectedFiles; i++) + { + cluster.get(1).executeInternal(withKeyspace("insert into %s.users(user_id) values (?)"), "dcapwell" + i); + cluster.get(1).flush(KEYSPACE); + } + + cluster.get(2).nodetoolResult("rebuild", "--keyspace", KEYSPACE).asserts().success(); + for (int nodeId : Arrays.asList(1, 2)) + QueryResultUtil.assertThat(cluster.get(nodeId).executeInternalWithResult("SELECT * FROM system_views.streaming")).isEmpty(); + + // trigger streaming again + cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.users(user_id) VALUES ('trigger streaming')")); + // mimic JMX + cluster.get(2).runOnInstance(() -> StreamManager.instance.setStreamingStatsEnabled(true)); + cluster.get(2).nodetoolResult("repair", KEYSPACE).asserts().success(); + + QueryResultUtil.assertThat(cluster.get(1).executeInternalWithResult("SELECT * FROM system_views.streaming")).isEmpty(); + QueryResultUtil.assertThat(cluster.get(2).executeInternalWithResult("SELECT * FROM system_views.streaming")).hasSize(1); + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java b/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java index 58842bc691..a502e8c1e6 100644 --- a/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java +++ b/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.Objects; import java.util.function.Predicate; +import com.google.monitoring.runtime.instrumentation.common.collect.Iterators; import org.apache.cassandra.distributed.api.Row; import org.apache.cassandra.distributed.api.SimpleQueryResult; import org.apache.cassandra.tools.nodetool.formatter.TableBuilder; @@ -176,5 +177,11 @@ public class QueryResultUtil Assertions.assertThat(qr.toObjectArrays()).hasSizeGreaterThan(size); return this; } + + public void isEmpty() + { + int size = Iterators.size(qr); + Assertions.assertThat(size).describedAs("QueryResult is not empty").isEqualTo(0); + } } } diff --git a/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java b/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java index cce995688e..c8e3d89f6b 100644 --- a/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java +++ b/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java @@ -18,9 +18,13 @@ package org.apache.cassandra.db.virtual; import java.net.UnknownHostException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.Date; +import java.util.List; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; @@ -34,9 +38,11 @@ import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.ProgressInfo.Direction; import org.apache.cassandra.streaming.SessionInfo; import org.apache.cassandra.streaming.StreamCoordinator; import org.apache.cassandra.streaming.StreamEvent; +import org.apache.cassandra.streaming.StreamEvent.ProgressEvent; import org.apache.cassandra.streaming.StreamManager; import org.apache.cassandra.streaming.StreamOperation; import org.apache.cassandra.streaming.StreamResultFuture; @@ -90,7 +96,7 @@ public class StreamingVirtualTableTest extends CQLTester assertRows(execute(t("select id, follower, operation, peers, status, progress_percentage, last_updated_at, failure_cause, success_message from %s")), new Object[] { state.id(), true, "Repair", Collections.emptyList(), "start", 0F, new Date(state.lastUpdatedAtMillis()), null, null }); - state.handleStreamEvent(new StreamEvent.SessionPreparedEvent(state.id(), new SessionInfo(PEER2, 1, PEER1, Collections.emptyList(), Collections.emptyList(), StreamSession.State.PREPARING))); + state.handleStreamEvent(new StreamEvent.SessionPreparedEvent(state.id(), new SessionInfo(PEER2, 1, PEER1, Collections.emptyList(), Collections.emptyList(), StreamSession.State.PREPARING), StreamSession.PrepareDirection.ACK)); state.onSuccess(new StreamState(state.id(), StreamOperation.REPAIR, ImmutableSet.of(new SessionInfo(PEER2, 1, PEER1, Collections.emptyList(), Collections.emptyList(), StreamSession.State.COMPLETE)))); assertRows(execute(t("select id, follower, operation, peers, status, progress_percentage, last_updated_at, failure_cause, success_message from %s")), @@ -118,8 +124,9 @@ public class StreamingVirtualTableTest extends CQLTester SessionInfo s1 = new SessionInfo(PEER2, 0, FBUtilities.getBroadcastAddressAndPort(), Arrays.asList(streamSummary()), Arrays.asList(streamSummary(), streamSummary()), StreamSession.State.PREPARING); SessionInfo s2 = new SessionInfo(PEER3, 0, FBUtilities.getBroadcastAddressAndPort(), Arrays.asList(streamSummary()), Arrays.asList(streamSummary(), streamSummary()), StreamSession.State.PREPARING); - state.handleStreamEvent(new StreamEvent.SessionPreparedEvent(state.id(), s1)); - state.handleStreamEvent(new StreamEvent.SessionPreparedEvent(state.id(), s2)); + // we only update stats on ACK + state.handleStreamEvent(new StreamEvent.SessionPreparedEvent(state.id(), s1, StreamSession.PrepareDirection.ACK)); + state.handleStreamEvent(new StreamEvent.SessionPreparedEvent(state.id(), s2, StreamSession.PrepareDirection.ACK)); long bytesToReceive = 0, bytesToSend = 0; long filesToReceive = 0, filesToSend = 0; @@ -133,31 +140,34 @@ public class StreamingVirtualTableTest extends CQLTester assertRows(execute(t("select id, follower, peers, status, progress_percentage, bytes_to_receive, bytes_received, bytes_to_send, bytes_sent, files_to_receive, files_received, files_to_send, files_sent from %s")), new Object[] { state.id(), follower, Arrays.asList(PEER2.toString(), PEER3.toString()), "start", 0F, bytesToReceive, 0L, bytesToSend, 0L, filesToReceive, 0L, filesToSend, 0L }); - // update progress + // update progress; sent all but 1 file long bytesReceived = 0, bytesSent = 0; + long filesReceived = 0, filesSent = 0; for (SessionInfo s : Arrays.asList(s1, s2)) { - long in = s.getTotalFilesToReceive() - 1; - long inBytes = s.getTotalSizeToReceive() - in; - long out = s.getTotalFilesToSend() - 1; - long outBytes = s.getTotalSizeToSend() - out; - state.handleStreamEvent(new StreamEvent.ProgressEvent(state.id(), new ProgressInfo((InetAddressAndPort) s.peer, 0, "0", ProgressInfo.Direction.IN, inBytes, inBytes))); - state.handleStreamEvent(new StreamEvent.ProgressEvent(state.id(), new ProgressInfo((InetAddressAndPort) s.peer, 0, "0", ProgressInfo.Direction.OUT, outBytes, outBytes))); - bytesReceived += inBytes; - bytesSent += outBytes; + List<StreamSummary> receiving = deterministic(s.receivingSummaries); + bytesReceived += progressEvent(state, s, receiving, Direction.IN); + filesReceived += receiving.stream().mapToInt(ss -> ss.files - 1).sum(); + + List<StreamSummary> sending = deterministic(s.sendingSummaries); + bytesSent += progressEvent(state, s, sending, Direction.OUT); + filesSent += sending.stream().mapToInt(ss -> ss.files - 1).sum(); } assertRows(execute(t("select id, follower, peers, status, bytes_to_receive, bytes_received, bytes_to_send, bytes_sent, files_to_receive, files_received, files_to_send, files_sent from %s")), - new Object[] { state.id(), follower, Arrays.asList(PEER2.toString(), PEER3.toString()), "start", bytesToReceive, bytesReceived, bytesToSend, bytesSent, filesToReceive, 2L, filesToSend, 2L }); + new Object[] { state.id(), follower, Arrays.asList(PEER2.toString(), PEER3.toString()), "start", bytesToReceive, bytesReceived, bytesToSend, bytesSent, filesToReceive, filesReceived, filesToSend, filesSent }); // finish for (SessionInfo s : Arrays.asList(s1, s2)) { // complete the rest - for (long i = 1; i < s.getTotalFilesToReceive(); i++) - state.handleStreamEvent(new StreamEvent.ProgressEvent(state.id(), new ProgressInfo((InetAddressAndPort) s.peer, 0, Long.toString(i), ProgressInfo.Direction.IN, 1, 1))); - for (long i = 1; i < s.getTotalFilesToSend(); i++) - state.handleStreamEvent(new StreamEvent.ProgressEvent(state.id(), new ProgressInfo((InetAddressAndPort) s.peer, 0, Long.toString(i), ProgressInfo.Direction.OUT, 1, 1))); + List<StreamSummary> receiving = deterministic(s.receivingSummaries); + bytesReceived += completeEvent(state, s, receiving, Direction.IN); + filesReceived += receiving.stream().mapToInt(ss -> ss.files - 1).sum(); + + List<StreamSummary> sending = deterministic(s.sendingSummaries); + bytesSent += completeEvent(state, s, sending, Direction.OUT); + filesSent += sending.stream().mapToInt(ss -> ss.files - 1).sum(); } assertRows(execute(t("select id, follower, peers, status, progress_percentage, bytes_to_receive, bytes_received, bytes_to_send, bytes_sent, files_to_receive, files_received, files_to_send, files_sent from %s")), @@ -168,6 +178,47 @@ public class StreamingVirtualTableTest extends CQLTester new Object[] { state.id(), follower, Arrays.asList(PEER2.toString(), PEER3.toString()), "success", 100F, new Date(state.lastUpdatedAtMillis()), null, null }); } + private static long progressEvent(StreamingState state, SessionInfo s, List<StreamSummary> summaries, Direction direction) + { + long counter = 0; + for (StreamSummary summary : summaries) + { + long fileSize = summary.totalSize / summary.files; + for (int i = 0; i < summary.files - 1; i++) + { + String fileName = summary.tableId + "-" + direction.name().toLowerCase() + "-" + i; + state.handleStreamEvent(new ProgressEvent(state.id(), new ProgressInfo((InetAddressAndPort) s.peer, 0, fileName, direction, fileSize, fileSize, fileSize))); + counter += fileSize; + } + } + return counter; + } + + private static long completeEvent(StreamingState state, SessionInfo s, List<StreamSummary> summaries, Direction direction) + { + long counter = 0; + for (StreamSummary summary : summaries) + { + long fileSize = summary.totalSize / summary.files; + String fileName = summary.tableId + "-" + direction.name().toLowerCase() + "-" + summary.files; + state.handleStreamEvent(new ProgressEvent(state.id(), new ProgressInfo((InetAddressAndPort) s.peer, 0, fileName, direction, fileSize, fileSize, fileSize))); + counter += fileSize; + } + return counter; + } + + private List<StreamSummary> deterministic(Collection<StreamSummary> summaries) + { + // SessionInfo uses a ImmutableSet... so create a list + List<StreamSummary> list = new ArrayList<>(summaries); + // need to order so all calls with the same input return the same order + // if duplicates are found, the object order may be different but the contents will match + Collections.sort(list, Comparator.comparing((StreamSummary a) -> a.tableId.asUUID()) + .thenComparingInt(a -> a.files) + .thenComparingLong(a -> a.totalSize)); + return list; + } + private static StreamSummary streamSummary() { int files = ThreadLocalRandom.current().nextInt(2, 10); diff --git a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java index 4f0c49468d..45172fe14c 100644 --- a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java +++ b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java @@ -57,13 +57,13 @@ public class SessionInfoTest assert info.getTotalFilesSent() == 0; // receive in progress - info.updateProgress(new ProgressInfo(local, 0, "test.txt", ProgressInfo.Direction.IN, 50, 100)); + info.updateProgress(new ProgressInfo(local, 0, "test.txt", ProgressInfo.Direction.IN, 50, 50, 100)); // still in progress, but not completed yet assert info.getTotalSizeReceived() == 50; assert info.getTotalSizeSent() == 0; assert info.getTotalFilesReceived() == 0; assert info.getTotalFilesSent() == 0; - info.updateProgress(new ProgressInfo(local, 0, "test.txt", ProgressInfo.Direction.IN, 100, 100)); + info.updateProgress(new ProgressInfo(local, 0, "test.txt", ProgressInfo.Direction.IN, 100, 100, 100)); // 1 file should be completed assert info.getTotalSizeReceived() == 100; assert info.getTotalSizeSent() == 0; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org