This is an automated email from the ASF dual-hosted git repository. djoshi pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new b1411a4 Fix streaming stats during entire sstable streaming b1411a4 is described below commit b1411a43180e0085ae4741f4da567a08b5a28f17 Author: Stefan Miklosovic <stefan.mikloso...@instaclustr.com> AuthorDate: Mon Apr 6 10:11:42 2020 +0200 Fix streaming stats during entire sstable streaming Patch by Stefan Miklosovic; Reviewed by Dinesh Joshi for CASSANDRA-15694 --- .../db/streaming/CassandraIncomingFile.java | 10 + .../db/streaming/CassandraOutgoingFile.java | 21 +- .../apache/cassandra/streaming/IncomingStream.java | 1 + .../apache/cassandra/streaming/OutgoingStream.java | 1 + .../cassandra/streaming/StreamReceiveTask.java | 2 +- .../cassandra/streaming/StreamTransferTask.java | 16 +- ...ntireSSTableStreamingCorrectFilesCountTest.java | 236 +++++++++++++++++++++ .../streaming/StreamTransferTaskTest.java | 4 +- 8 files changed, 278 insertions(+), 13 deletions(-) diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java index c65ca62..807d935 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java @@ -46,6 +46,7 @@ public class CassandraIncomingFile implements IncomingStream private volatile SSTableMultiWriter sstable; private volatile long size = -1; + private volatile int numFiles = 1; private static final Logger logger = LoggerFactory.getLogger(CassandraIncomingFile.class); @@ -64,7 +65,10 @@ public class CassandraIncomingFile implements IncomingStream IStreamReader reader; if (streamHeader.isEntireSSTable) + { reader = new CassandraEntireSSTableStreamReader(header, streamHeader, session); + numFiles = streamHeader.componentManifest.components().size(); + } else if (streamHeader.isCompressed()) reader = new CassandraCompressedStreamReader(header, streamHeader, session); else @@ -88,6 +92,12 @@ public class CassandraIncomingFile implements IncomingStream } @Override + public int getNumFiles() + { + return numFiles; + } + + @Override public TableId getTableId() { Preconditions.checkState(sstable != null, "Stream hasn't been read yet"); diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java index 237c0af..0917fba 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java @@ -59,7 +59,7 @@ public class CassandraOutgoingFile implements OutgoingStream private final boolean keepSSTableLevel; private final ComponentManifest manifest; - private final boolean shouldStreamEntireSStable; + private final boolean shouldStreamEntireSSTable; public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> ref, List<SSTableReader.PartitionPositionBounds> sections, List<Range<Token>> normalizedRanges, @@ -72,7 +72,7 @@ public class CassandraOutgoingFile implements OutgoingStream this.sections = sections; this.filename = ref.get().getFilename(); this.manifest = getComponentManifest(ref.get()); - this.shouldStreamEntireSStable = shouldStreamEntireSSTable(); + this.shouldStreamEntireSSTable = computeShouldStreamEntireSSTables(); SSTableReader sstable = ref.get(); keepSSTableLevel = operation == StreamOperation.BOOTSTRAP || operation == StreamOperation.REBUILD; @@ -85,7 +85,7 @@ public class CassandraOutgoingFile implements OutgoingStream .withSections(sections) .withCompressionMetadata(sstable.compression ? sstable.getCompressionMetadata() : null) .withSerializationHeader(sstable.header.toComponent()) - .isEntireSSTable(shouldStreamEntireSStable) + .isEntireSSTable(shouldStreamEntireSSTable) .withComponentManifest(manifest) .withFirstKey(sstable.first) .withTableId(sstable.metadata().id) @@ -137,6 +137,12 @@ public class CassandraOutgoingFile implements OutgoingStream } @Override + public int getNumFiles() + { + return shouldStreamEntireSSTable ? getManifestSize() : 1; + } + + @Override public long getRepairedAt() { return ref.get().getRepairedAt(); @@ -148,6 +154,11 @@ public class CassandraOutgoingFile implements OutgoingStream return ref.get().getPendingRepair(); } + public int getManifestSize() + { + return manifest.components().size(); + } + @Override public void write(StreamSession session, DataOutputStreamPlus out, int version) throws IOException { @@ -155,7 +166,7 @@ public class CassandraOutgoingFile implements OutgoingStream CassandraStreamHeader.serializer.serialize(header, out, version); out.flush(); - if (shouldStreamEntireSStable && out instanceof AsyncStreamingOutputPlus) + if (shouldStreamEntireSSTable && out instanceof AsyncStreamingOutputPlus) { CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, manifest); writer.write((AsyncStreamingOutputPlus) out); @@ -171,7 +182,7 @@ public class CassandraOutgoingFile implements OutgoingStream } @VisibleForTesting - public boolean shouldStreamEntireSSTable() + public boolean computeShouldStreamEntireSSTables() { // don't stream if full sstable transfers are disabled or legacy counter shards are present if (!DatabaseDescriptor.streamEntireSSTables() || ref.get().getSSTableMetadata().hasLegacyCounterShards) diff --git a/src/java/org/apache/cassandra/streaming/IncomingStream.java b/src/java/org/apache/cassandra/streaming/IncomingStream.java index 18bebf5..55fbd4f 100644 --- a/src/java/org/apache/cassandra/streaming/IncomingStream.java +++ b/src/java/org/apache/cassandra/streaming/IncomingStream.java @@ -41,5 +41,6 @@ public interface IncomingStream String getName(); long getSize(); + int getNumFiles(); TableId getTableId(); } diff --git a/src/java/org/apache/cassandra/streaming/OutgoingStream.java b/src/java/org/apache/cassandra/streaming/OutgoingStream.java index e71b985..4a58cae 100644 --- a/src/java/org/apache/cassandra/streaming/OutgoingStream.java +++ b/src/java/org/apache/cassandra/streaming/OutgoingStream.java @@ -49,4 +49,5 @@ public interface OutgoingStream String getName(); long getSize(); TableId getTableId(); + int getNumFiles(); } diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index 87d6ce0..25977a5 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@ -82,7 +82,7 @@ public class StreamReceiveTask extends StreamTask return; } - remoteStreamsReceived++; + remoteStreamsReceived += stream.getNumFiles(); bytesReceived += stream.getSize(); Preconditions.checkArgument(tableId.equals(stream.getTableId())); logger.debug("received {} of {} total files {} of total bytes {}", remoteStreamsReceived, totalStreams, diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java index ba05acd..0f7a834 100644 --- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java @@ -17,16 +17,20 @@ */ package org.apache.cassandra.streaming; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +56,8 @@ public class StreamTransferTask extends StreamTask protected final Map<Integer, OutgoingStreamMessage> streams = new HashMap<>(); private final Map<Integer, ScheduledFuture> timeoutTasks = new HashMap<>(); - private long totalSize; + private long totalSize = 0; + private int totalFiles = 0; public StreamTransferTask(StreamSession session, TableId tableId) { @@ -66,6 +71,7 @@ public class StreamTransferTask extends StreamTask message = StreamHook.instance.reportOutgoingStream(session, stream, message); streams.put(message.header.sequenceNumber, message); totalSize += message.stream.getSize(); + totalFiles += message.stream.getNumFiles(); } /** @@ -125,7 +131,7 @@ public class StreamTransferTask extends StreamTask public synchronized int getTotalNumberOfFiles() { - return streams.size(); + return totalFiles; } public long getTotalSize() diff --git a/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java new file mode 100644 index 0000000..a57fcbc --- /dev/null +++ b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.streaming; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.UUID; + +import org.junit.BeforeClass; +import org.junit.Test; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.channel.embedded.EmbeddedChannel; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.streaming.CassandraOutgoingFile; +import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.net.AsyncStreamingOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.SharedDefaultFileRegion; +import org.apache.cassandra.schema.CompactionParams; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.checkerframework.checker.nullness.qual.Nullable; + +import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +public class EntireSSTableStreamingCorrectFilesCountTest +{ + public static final String KEYSPACE = "EntireSSTableStreamingCorrectFilesCountTest"; + public static final String CF_STANDARD = "Standard1"; + + private static SSTableReader sstable; + private static ColumnFamilyStore store; + private static RangesAtEndpoint rangesAtEndpoint; + + @BeforeClass + public static void defineSchemaAndPrepareSSTable() + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARD) + // LeveledCompactionStrategy is important here, + // streaming of entire SSTables works currently only with this strategy + .compaction(CompactionParams.lcs(Collections.emptyMap())) + .partitioner(ByteOrderedPartitioner.instance)); + + Keyspace keyspace = Keyspace.open(KEYSPACE); + store = keyspace.getColumnFamilyStore(CF_STANDARD); + + // insert data and compact to a single sstable + CompactionManager.instance.disableAutoCompaction(); + + for (int j = 0; j < 10; j++) + { + new RowUpdateBuilder(store.metadata(), j, String.valueOf(j)) + .clustering("0") + .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER) + .build() + .applyUnsafe(); + } + + store.forceBlockingFlush(); + CompactionManager.instance.performMaximal(store, false); + + sstable = store.getLiveSSTables().iterator().next(); + + Token start = ByteOrderedPartitioner.instance.getTokenFactory().fromString(Long.toHexString(0)); + Token end = ByteOrderedPartitioner.instance.getTokenFactory().fromString(Long.toHexString(100)); + + rangesAtEndpoint = RangesAtEndpoint.toDummyList(Collections.singleton(new Range<>(start, end))); + } + + @Test + public void test() throws Exception + { + FileCountingStreamEventHandler streamEventHandler = new FileCountingStreamEventHandler(); + StreamSession session = setupStreamingSessionForTest(streamEventHandler); + Collection<OutgoingStream> outgoingStreams = store.getStreamManager().createOutgoingStreams(session, + rangesAtEndpoint, + NO_PENDING_REPAIR, + PreviewKind.NONE); + + session.addTransferStreams(outgoingStreams); + DataOutputStreamPlus out = constructDataOutputStream(); + + for (OutgoingStream outgoingStream : outgoingStreams) + outgoingStream.write(session, out, MessagingService.VERSION_40); + + int totalNumberOfFiles = session.transfers.get(store.metadata.id).getTotalNumberOfFiles(); + + assertEquals(CassandraOutgoingFile.getComponentManifest(sstable).components().size(), totalNumberOfFiles); + assertEquals(streamEventHandler.fileNames.size(), totalNumberOfFiles); + } + + private DataOutputStreamPlus constructDataOutputStream() + { + // This is needed as Netty releases the ByteBuffers as soon as the channel is flushed + ByteBuf serializedFile = Unpooled.buffer(8192); + EmbeddedChannel channel = createMockNettyChannel(serializedFile); + return new AsyncStreamingOutputPlus(channel) + { + public void flush() throws IOException + { + // NO-OP + } + }; + } + + private EmbeddedChannel createMockNettyChannel(ByteBuf serializedFile) + { + WritableByteChannel wbc = new WritableByteChannel() + { + private boolean isOpen = true; + + public int write(ByteBuffer src) + { + int size = src.limit(); + serializedFile.writeBytes(src); + return size; + } + + public boolean isOpen() + { + return isOpen; + } + + public void close() + { + isOpen = false; + } + }; + + return new EmbeddedChannel(new ChannelOutboundHandlerAdapter() + { + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception + { + ((SharedDefaultFileRegion) msg).transferTo(wbc, 0); + super.write(ctx, msg, promise); + } + }); + } + + + private StreamSession setupStreamingSessionForTest(StreamEventHandler streamEventHandler) + { + StreamCoordinator streamCoordinator = new StreamCoordinator(StreamOperation.BOOTSTRAP, + 1, + new DefaultConnectionFactory(), + false, + null, + PreviewKind.NONE); + + StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), + StreamOperation.BOOTSTRAP, + Collections.singleton(streamEventHandler), + streamCoordinator); + + InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort(); + streamCoordinator.addSessionInfo(new SessionInfo(peer, + 0, + peer, + Collections.emptyList(), + Collections.emptyList(), + StreamSession.State.INITIALIZED)); + + StreamSession session = streamCoordinator.getOrCreateNextSession(peer); + session.init(future); + + return session; + } + + private static final class FileCountingStreamEventHandler implements StreamEventHandler + { + final Collection<String> fileNames = new ArrayList<>(); + + public void handleStreamEvent(StreamEvent event) + { + if (event.eventType == StreamEvent.Type.FILE_PROGRESS && event instanceof StreamEvent.ProgressEvent) + { + StreamEvent.ProgressEvent progressEvent = ((StreamEvent.ProgressEvent) event); + fileNames.add(progressEvent.progress.fileName); + } + } + + public void onSuccess(@Nullable StreamState streamState) + { + assert streamState != null; + assertFalse(streamState.hasFailedSession()); + } + + public void onFailure(Throwable throwable) + { + fail(); + } + } +} diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java index 2361125..2f4feff 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java @@ -95,7 +95,7 @@ public class StreamTransferTaskTest ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken())); task.addTransferStream(new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.selfRef(), sstable.getPositionsForRanges(ranges), ranges, 1)); } - assertEquals(2, task.getTotalNumberOfFiles()); + assertEquals(14, task.getTotalNumberOfFiles()); // if file sending completes before timeout then the task should be canceled. Future f = task.scheduleTimeout(0, 0, TimeUnit.NANOSECONDS); @@ -147,7 +147,7 @@ public class StreamTransferTaskTest refs.add(ref); task.addTransferStream(new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, ref, sstable.getPositionsForRanges(ranges), ranges, 1)); } - assertEquals(2, task.getTotalNumberOfFiles()); + assertEquals(14, task.getTotalNumberOfFiles()); //add task to stream session, so it is aborted when stream session fails session.transfers.put(TableId.generate(), task); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org