This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 31bea0b0d41e4e81095f0d088094f03db14af490 Author: Benedict Elliott Smith <bened...@apple.com> AuthorDate: Wed Nov 17 14:50:09 2021 +0000 [CEP-10] Cluster and Code Simulations: Minor fixes - fix repair timeout - fix secondary index flushing - fix race condition with Ref - fix resource leaks - fix incorrect exists() - fix repair error reporting when null exception message - fix dtest use of System.nanoTime patch by Benedict; reviewed by Sam Tunnicliffe for CASSANDRA-17008 --- .../apache/cassandra/cache/AutoSavingCache.java | 1 - .../cassandra/concurrent/NamedThreadFactory.java | 2 +- .../org/apache/cassandra/db/ColumnFamilyStore.java | 2 +- src/java/org/apache/cassandra/db/Directories.java | 1 - src/java/org/apache/cassandra/db/ReadResponse.java | 1 - .../org/apache/cassandra/db/SystemKeyspace.java | 1 - .../cassandra/db/WindowsFailedSnapshotTracker.java | 2 -- .../commitlog/CommitLogSegmentManagerStandard.java | 1 - .../apache/cassandra/db/compaction/Scrubber.java | 1 - .../apache/cassandra/db/compaction/Verifier.java | 11 ++----- .../compaction/writers/CompactionAwareWriter.java | 1 - .../apache/cassandra/db/lifecycle/LogRecord.java | 1 - .../org/apache/cassandra/gms/EndpointState.java | 1 - .../org/apache/cassandra/gms/GossipDigest.java | 1 - .../org/apache/cassandra/gms/GossipDigestAck2.java | 1 - .../org/apache/cassandra/gms/GossipDigestSyn.java | 1 - .../org/apache/cassandra/gms/HeartBeatState.java | 1 - .../org/apache/cassandra/gms/TokenSerializer.java | 3 -- .../org/apache/cassandra/gms/VersionedValue.java | 1 - .../cassandra/hints/ChecksummedDataInput.java | 1 - .../apache/cassandra/index/sasi/TermIterator.java | 4 --- .../index/sasi/disk/OnDiskIndexBuilder.java | 1 - .../io/compress/CompressedSequentialWriter.java | 1 - .../cassandra/io/compress/CompressionMetadata.java | 3 -- .../apache/cassandra/io/sstable/Descriptor.java | 2 -- .../io/sstable/SSTableIdentityIterator.java | 1 - .../cassandra/io/sstable/format/SSTableReader.java | 30 ++++++++++------- .../io/sstable/format/SSTableReaderBuilder.java | 2 -- .../io/sstable/format/big/BigTableWriter.java | 2 +- .../io/sstable/metadata/MetadataSerializer.java | 1 - .../io/util/BufferedDataOutputStreamPlus.java | 3 ++ .../org/apache/cassandra/io/util/FileHandle.java | 12 +++++-- .../org/apache/cassandra/io/util/PathUtils.java | 2 -- src/java/org/apache/cassandra/net/Verb.java | 2 +- .../repair/consistent/SyncStatSummary.java | 1 - .../schema/SystemDistributedKeyspace.java | 5 ++- .../apache/cassandra/service/StartupChecks.java | 2 -- .../apache/cassandra/service/StorageService.java | 1 - .../org/apache/cassandra/service/paxos/Commit.java | 34 +++++++++++++++++++ .../apache/cassandra/service/paxos/PaxosState.java | 6 ++-- .../service/snapshot/SnapshotManifest.java | 14 ++++++-- .../cassandra/streaming/StreamResultFuture.java | 5 ++- .../apache/cassandra/streaming/StreamSession.java | 9 +++++ .../cassandra/streaming/StreamTransferTask.java | 38 +++++++++++++--------- .../async/StreamingMultiplexedChannel.java | 1 + .../cassandra/tools/BulkLoadConnectionFactory.java | 1 - .../cassandra/utils/BloomFilterSerializer.java | 2 -- .../org/apache/cassandra/utils/FBUtilities.java | 2 +- .../cassandra/utils/binlog/ExternalArchiver.java | 1 - .../apache/cassandra/utils/concurrent/Threads.java | 5 +++ .../apache/cassandra/utils/obs/OffHeapBitSet.java | 1 - .../cassandra/distributed/impl/Coordinator.java | 2 +- .../apache/cassandra/distributed/impl/Query.java | 4 ++- .../streaming/StreamTransferTaskTest.java | 1 + 54 files changed, 140 insertions(+), 96 deletions(-) diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java index 03cbde2..0e022d4 100644 --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@ -27,7 +27,6 @@ import java.util.*; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import org.apache.cassandra.io.util.File; import org.cliffc.high_scale_lib.NonBlockingHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java index 88e0d10..9816649 100644 --- a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java +++ b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java @@ -177,7 +177,7 @@ public class NamedThreadFactory implements ThreadFactory @Override public String toString() { - return id; + return threadGroup != null ? id + " in " + threadGroup.getName() : id; } public void close() diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index a9bcdaa..be03a34 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -297,7 +297,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } } }; - ScheduledExecutors.scheduledTasks.schedule(runnable, period, TimeUnit.MILLISECONDS); + ScheduledExecutors.scheduledTasks.scheduleSelfRecurring(runnable, period, TimeUnit.MILLISECONDS); } } diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java index 8b48c5c..c2ad4bd 100644 --- a/src/java/org/apache/cassandra/db/Directories.java +++ b/src/java/org/apache/cassandra/db/Directories.java @@ -30,7 +30,6 @@ import java.io.IOException; import java.nio.file.FileStore; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; import java.util.*; import java.util.concurrent.ThreadLocalRandom; import java.util.function.BiPredicate; diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java index 568b1a1..52e6fd5 100644 --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@ -30,7 +30,6 @@ import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.io.util.File; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.ByteBufferUtil; diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 6ff22df..3ce81e3 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -38,7 +38,6 @@ import com.google.common.collect.SetMultimap; import com.google.common.collect.Sets; import com.google.common.io.ByteStreams; -import org.apache.cassandra.io.util.File; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java b/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java index 2cb5eb1..b61c4d0 100644 --- a/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java +++ b/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java @@ -23,8 +23,6 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.PrintWriter; -import java.nio.file.Files; -import java.nio.file.Paths; import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.io.util.File; diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java index c144d09..0e051cf 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java @@ -19,7 +19,6 @@ package org.apache.cassandra.db.commitlog; import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.FileUtils; public class CommitLogSegmentManagerStandard extends AbstractCommitLogSegmentManager diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index 666b037..ebb2303 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -44,7 +44,6 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.*; -import org.apache.cassandra.utils.Closeable; import org.apache.cassandra.utils.concurrent.Refs; import org.apache.cassandra.utils.memory.HeapAllocator; diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java index 8b7f0d6..5a73416 100644 --- a/src/java/org/apache/cassandra/db/compaction/Verifier.java +++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java @@ -36,7 +36,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.metadata.MetadataComponent; import org.apache.cassandra.io.sstable.metadata.MetadataType; import org.apache.cassandra.io.sstable.metadata.ValidationMetadata; -import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataIntegrityMetadata; import org.apache.cassandra.io.util.DataIntegrityMetadata.FileDigestValidator; import org.apache.cassandra.io.util.FileInputStreamPlus; @@ -52,16 +51,12 @@ import org.apache.cassandra.utils.IFilter; import org.apache.cassandra.utils.OutputHandler; import org.apache.cassandra.utils.UUIDGen; -import java.io.BufferedInputStream; import java.io.Closeable; -import java.io.DataInput; import java.io.DataInputStream; import java.io.IOError; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -448,10 +443,10 @@ public class Verifier implements Closeable private void deserializeBloomFilter(SSTableReader sstable) throws IOException { - Path bfPath = Paths.get(sstable.descriptor.filenameFor(Component.FILTER)); - if (Files.exists(bfPath)) + File bfPath = new File(sstable.descriptor.filenameFor(Component.FILTER)); + if (bfPath.exists()) { - try (FileInputStreamPlus stream = new File(bfPath).newInputStream(); + try (FileInputStreamPlus stream = bfPath.newInputStream(); IFilter bf = BloomFilterSerializer.deserialize(stream, sstable.descriptor.version.hasOldBfFormat())) { } diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java index 74ebac7..2251f6a 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.Set; import java.util.UUID; -import org.apache.cassandra.io.util.File; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java index 4fb3947..45653c4 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java @@ -22,7 +22,6 @@ package org.apache.cassandra.db.lifecycle; import java.nio.file.Path; -import java.nio.file.Paths; import java.util.*; import java.util.function.BiPredicate; import java.util.regex.Matcher; diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java index b7f6bdb..2cc9c0d 100644 --- a/src/java/org/apache/cassandra/gms/EndpointState.java +++ b/src/java/org/apache/cassandra/gms/EndpointState.java @@ -26,7 +26,6 @@ import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; -import org.apache.cassandra.io.util.File; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/java/org/apache/cassandra/gms/GossipDigest.java b/src/java/org/apache/cassandra/gms/GossipDigest.java index 4115c38..53f6c5c 100644 --- a/src/java/org/apache/cassandra/gms/GossipDigest.java +++ b/src/java/org/apache/cassandra/gms/GossipDigest.java @@ -23,7 +23,6 @@ import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.io.util.File; import org.apache.cassandra.locator.InetAddressAndPort; import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAddressAndPortSerializer; diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAck2.java b/src/java/org/apache/cassandra/gms/GossipDigestAck2.java index 7324763..0e4062b 100644 --- a/src/java/org/apache/cassandra/gms/GossipDigestAck2.java +++ b/src/java/org/apache/cassandra/gms/GossipDigestAck2.java @@ -25,7 +25,6 @@ import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.io.util.File; import org.apache.cassandra.locator.InetAddressAndPort; import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAddressAndPortSerializer; diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSyn.java b/src/java/org/apache/cassandra/gms/GossipDigestSyn.java index c2c736f..7c2ae94 100644 --- a/src/java/org/apache/cassandra/gms/GossipDigestSyn.java +++ b/src/java/org/apache/cassandra/gms/GossipDigestSyn.java @@ -30,7 +30,6 @@ import org.apache.cassandra.io.util.DataOutputPlus; * This is the first message that gets sent out as a start of the Gossip protocol in a * round. */ -import org.apache.cassandra.io.util.File; public class GossipDigestSyn { diff --git a/src/java/org/apache/cassandra/gms/HeartBeatState.java b/src/java/org/apache/cassandra/gms/HeartBeatState.java index d0a7142..cad6a48 100644 --- a/src/java/org/apache/cassandra/gms/HeartBeatState.java +++ b/src/java/org/apache/cassandra/gms/HeartBeatState.java @@ -29,7 +29,6 @@ import org.apache.cassandra.io.util.DataOutputPlus; /** * HeartBeat State associated with any given endpoint. */ -import org.apache.cassandra.io.util.File; public class HeartBeatState { diff --git a/src/java/org/apache/cassandra/gms/TokenSerializer.java b/src/java/org/apache/cassandra/gms/TokenSerializer.java index d73b077..0048e7c 100644 --- a/src/java/org/apache/cassandra/gms/TokenSerializer.java +++ b/src/java/org/apache/cassandra/gms/TokenSerializer.java @@ -30,9 +30,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; - -import org.apache.cassandra.io.util.File; - public class TokenSerializer { private static final Logger logger = LoggerFactory.getLogger(TokenSerializer.class); diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java index 659f61b..880cb98 100644 --- a/src/java/org/apache/cassandra/gms/VersionedValue.java +++ b/src/java/org/apache/cassandra/gms/VersionedValue.java @@ -24,7 +24,6 @@ import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; -import org.apache.cassandra.io.util.File; import static java.nio.charset.StandardCharsets.ISO_8859_1; import com.google.common.annotations.VisibleForTesting; diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java index 463f33e..339e45e 100644 --- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java +++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java @@ -25,7 +25,6 @@ import com.google.common.base.Preconditions; import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.io.util.*; -import org.apache.cassandra.io.util.File; import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.NativeLibrary; diff --git a/src/java/org/apache/cassandra/index/sasi/TermIterator.java b/src/java/org/apache/cassandra/index/sasi/TermIterator.java index c84b209..d65b386 100644 --- a/src/java/org/apache/cassandra/index/sasi/TermIterator.java +++ b/src/java/org/apache/cassandra/index/sasi/TermIterator.java @@ -20,11 +20,9 @@ package org.apache.cassandra.index.sasi; import java.util.List; import java.util.Set; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import io.netty.util.concurrent.FastThreadLocal; -import org.apache.cassandra.concurrent.ExecutorFactory; import org.apache.cassandra.concurrent.ImmediateExecutor; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.index.sasi.disk.Token; @@ -33,8 +31,6 @@ import org.apache.cassandra.index.sasi.utils.RangeUnionIterator; import org.apache.cassandra.index.sasi.utils.RangeIterator; import org.apache.cassandra.io.util.FileUtils; -import com.google.common.util.concurrent.MoreExecutors; - import org.apache.cassandra.utils.concurrent.CountDownLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java index 9ba9f9c..a6faa04 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java @@ -31,7 +31,6 @@ import org.apache.cassandra.index.sasi.sa.SuffixSA; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.util.*; -import org.apache.cassandra.io.util.File; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java index 8321345..024e4ef 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java @@ -30,7 +30,6 @@ import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.util.*; -import org.apache.cassandra.io.util.File; import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.utils.ByteBufferUtil; diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java index cc6ce6b..5af9c92 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java +++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java @@ -17,11 +17,8 @@ */ package org.apache.cassandra.io.compress; -import java.nio.file.Files; import java.nio.file.NoSuchFileException; -import java.nio.file.Paths; import java.io.DataInput; -import java.io.DataInputStream; import java.io.DataOutput; import java.io.EOFException; diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java index f4e7f00..7c70b5a 100644 --- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java +++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java @@ -17,8 +17,6 @@ */ package org.apache.cassandra.io.sstable; -import java.io.IOError; -import java.io.IOException; import java.util.*; import java.util.regex.Pattern; diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java index 2b32278..76e12c8 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java @@ -19,7 +19,6 @@ package org.apache.cassandra.io.sstable; import java.io.*; -import org.apache.cassandra.io.util.File; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 804a59a..998d374 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -45,7 +45,6 @@ import com.clearspring.analytics.stream.cardinality.ICardinality; import org.apache.cassandra.cache.InstrumentingCache; import org.apache.cassandra.cache.KeyCacheKey; -import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.concurrent.ScheduledExecutorPlus; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; @@ -2212,18 +2211,27 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS public static Ref<GlobalTidy> get(SSTableReader sstable) { Descriptor descriptor = sstable.descriptor; - Ref<GlobalTidy> refc = lookup.get(descriptor); - if (refc != null) - return refc.ref(); - final GlobalTidy tidy = new GlobalTidy(sstable); - refc = new Ref<>(tidy, tidy); - Ref<?> ex = lookup.putIfAbsent(descriptor, refc); - if (ex != null) + + while (true) { - refc.close(); - throw new AssertionError(); + Ref<GlobalTidy> ref = lookup.get(descriptor); + if (ref == null) + { + final GlobalTidy tidy = new GlobalTidy(sstable); + ref = new Ref<>(tidy, tidy); + Ref<GlobalTidy> ex = lookup.putIfAbsent(descriptor, ref); + if (ex == null) + return ref; + ref = ex; + } + + Ref<GlobalTidy> newRef = ref.tryRef(); + if (newRef != null) + return newRef; + + // raced with tidy + lookup.remove(descriptor, ref); } - return refc; } } diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReaderBuilder.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReaderBuilder.java index 3386c23..6ca74f0 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReaderBuilder.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReaderBuilder.java @@ -39,12 +39,10 @@ import org.apache.cassandra.utils.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Files; -import java.nio.file.Paths; import java.util.Set; import java.util.concurrent.TimeUnit; diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index 7929059..889547d 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -25,7 +25,7 @@ import java.util.*; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; -import org.apache.cassandra.io.util.File; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java index 91889a7..e57a50e 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java @@ -28,7 +28,6 @@ import java.util.zip.CRC32; import com.google.common.base.Throwables; import com.google.common.collect.Lists; -import org.apache.cassandra.io.util.File; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java index a490ff6..e56b7b0 100644 --- a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java +++ b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java @@ -243,6 +243,9 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus @Override public void close() throws IOException { + if (buffer == null) + return; + doFlush(0); channel.close(); FileUtils.clean(buffer); diff --git a/src/java/org/apache/cassandra/io/util/FileHandle.java b/src/java/org/apache/cassandra/io/util/FileHandle.java index 6d3ae7c..6bab460 100644 --- a/src/java/org/apache/cassandra/io/util/FileHandle.java +++ b/src/java/org/apache/cassandra/io/util/FileHandle.java @@ -148,8 +148,16 @@ public class FileHandle extends SharedCloseableImpl public FileDataInput createReader(long position) { RandomAccessReader reader = createReader(); - reader.seek(position); - return reader; + try + { + reader.seek(position); + return reader; + } + catch (Throwable t) + { + try { reader.close(); } catch (Throwable t2) { t.addSuppressed(t2); } + throw t; + } } /** diff --git a/src/java/org/apache/cassandra/io/util/PathUtils.java b/src/java/org/apache/cassandra/io/util/PathUtils.java index 26f9dcc..690222f 100644 --- a/src/java/org/apache/cassandra/io/util/PathUtils.java +++ b/src/java/org/apache/cassandra/io/util/PathUtils.java @@ -41,8 +41,6 @@ import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.NoSpamLogger; import static java.nio.file.StandardOpenOption.*; -import static java.nio.file.StandardOpenOption.CREATE; -import static java.nio.file.StandardOpenOption.WRITE; import static java.util.Collections.unmodifiableSet; import static org.apache.cassandra.utils.Throwables.merge; diff --git a/src/java/org/apache/cassandra/net/Verb.java b/src/java/org/apache/cassandra/net/Verb.java index 9e3f5f6..bcd3070 100644 --- a/src/java/org/apache/cassandra/net/Verb.java +++ b/src/java/org/apache/cassandra/net/Verb.java @@ -225,7 +225,7 @@ public enum Verb private final Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> serializer; private final Supplier<? extends IVerbHandler<?>> handler; - final Verb responseVerb; + public final Verb responseVerb; private final ToLongFunction<TimeUnit> expiration; diff --git a/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java b/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java index 249d1a4..3d21702 100644 --- a/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java +++ b/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java @@ -28,7 +28,6 @@ import java.util.Optional; import com.google.common.collect.Lists; -import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.RepairResult; import org.apache.cassandra.repair.RepairSessionResult; import org.apache.cassandra.repair.SyncStat; diff --git a/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java index a539686..9f17578 100644 --- a/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java @@ -310,7 +310,10 @@ public final class SystemDistributedKeyspace keyspaceName, cfname, id.toString()); - processSilent(fmtQry, t.getMessage(), sw.toString()); + String message = t.getMessage(); + if (message == null) + message = t.getClass().getName(); + processSilent(fmtQry, message, sw.toString()); } public static void startViewBuild(String keyspace, String view, UUID hostId) diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java b/src/java/org/apache/cassandra/service/StartupChecks.java index 5cb938b..b8fc082 100644 --- a/src/java/org/apache/cassandra/service/StartupChecks.java +++ b/src/java/org/apache/cassandra/service/StartupChecks.java @@ -54,10 +54,8 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.NativeLibrary; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JavaUtils; -import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.SigarLibrary; -import static java.lang.String.format; import static org.apache.cassandra.config.CassandraRelevantProperties.COM_SUN_MANAGEMENT_JMXREMOTE_PORT; import static org.apache.cassandra.config.CassandraRelevantProperties.JAVA_VERSION; import static org.apache.cassandra.config.CassandraRelevantProperties.JAVA_VM_NAME; diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index dc3b878..44d757b 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -140,7 +140,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; -import static org.apache.cassandra.concurrent.FutureTask.callable; import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS; import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK; import static org.apache.cassandra.config.CassandraRelevantProperties.REPLACEMENT_ALLOW_EMPTY; diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java b/src/java/org/apache/cassandra/service/paxos/Commit.java index 05fa595..134e312 100644 --- a/src/java/org/apache/cassandra/service/paxos/Commit.java +++ b/src/java/org/apache/cassandra/service/paxos/Commit.java @@ -24,6 +24,8 @@ package org.apache.cassandra.service.paxos; import java.io.IOException; import java.util.UUID; +import javax.annotation.Nullable; + import com.google.common.base.Objects; import org.apache.cassandra.schema.TableMetadata; @@ -112,6 +114,38 @@ public class Commit return String.format("Commit(%s, %s)", ballot, update); } + /** + * @return testIfAfter.isAfter(testIfBefore), with non-null > null + */ + public static boolean isAfter(@Nullable Commit testIsAfter, @Nullable Commit testIsBefore) + { + return testIsAfter != null && testIsAfter.isAfter(testIsBefore); + } + + /** + * @return testIfAfter.isAfter(testIfBefore), with non-null > null + */ + public static boolean isAfter(@Nullable UUID testIsAfter, @Nullable Commit testIsBefore) + { + return testIsAfter != null && (testIsBefore == null || testIsAfter.timestamp() > testIsBefore.ballot.timestamp()); + } + + /** + * @return testIfAfter.isAfter(testIfBefore), with non-null > null + */ + public static boolean isAfter(@Nullable Commit testIsAfter, @Nullable UUID testIsBefore) + { + return testIsAfter != null && (testIsBefore == null || testIsAfter.ballot.timestamp() > testIsBefore.timestamp()); + } + + /** + * @return testIfAfter.isAfter(testIfBefore), with non-null > null + */ + public static boolean isAfter(@Nullable UUID testIsAfter, @Nullable UUID testIsBefore) + { + return testIsAfter != null && (testIsBefore == null || testIsAfter.timestamp() > testIsBefore.timestamp()); + } + public static class CommitSerializer implements IVersionedSerializer<Commit> { public void serialize(Commit commit, DataOutputPlus out, int version) throws IOException diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java index f15dfdb..4d57ef4 100644 --- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java +++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java @@ -53,9 +53,9 @@ public class PaxosState } } - private final Commit promised; - private final Commit accepted; - private final Commit mostRecentCommit; + public final Commit promised; + public final Commit accepted; + public final Commit mostRecentCommit; public PaxosState(DecoratedKey key, TableMetadata metadata) { diff --git a/src/java/org/apache/cassandra/service/snapshot/SnapshotManifest.java b/src/java/org/apache/cassandra/service/snapshot/SnapshotManifest.java index d8d900a..e1bd4df 100644 --- a/src/java/org/apache/cassandra/service/snapshot/SnapshotManifest.java +++ b/src/java/org/apache/cassandra/service/snapshot/SnapshotManifest.java @@ -30,6 +30,10 @@ import org.apache.cassandra.config.Duration; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.databind.DeserializationFeature; import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.FileInputStreamPlus; +import org.apache.cassandra.io.util.FileOutputStreamPlus; + +import static org.apache.cassandra.io.util.File.WriteMode.OVERWRITE; // Only serialize fields @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY, @@ -84,12 +88,18 @@ public class SnapshotManifest public void serializeToJsonFile(File outputFile) throws IOException { - mapper.writeValue(outputFile.toJavaIOFile(), this); + try (FileOutputStreamPlus out = outputFile.newOutputStream(OVERWRITE)) + { + mapper.writeValue((OutputStream) out, this); + } } public static SnapshotManifest deserializeFromJsonFile(File file) throws IOException { - return mapper.readValue(file.toJavaIOFile(), SnapshotManifest.class); + try (FileInputStreamPlus in = file.newInputStream()) + { + return mapper.readValue((InputStream) in, SnapshotManifest.class); + } } @Override diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java index 6bcd074..f7a0b63 100644 --- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java +++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java @@ -21,6 +21,8 @@ import java.util.Collection; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; +import com.google.common.annotations.VisibleForTesting; + import org.apache.cassandra.utils.concurrent.AsyncFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,7 +73,8 @@ public final class StreamResultFuture extends AsyncFuture<StreamState> trySuccess(getCurrentState()); } - private StreamResultFuture(UUID planId, StreamOperation streamOperation, UUID pendingRepair, PreviewKind previewKind) + @VisibleForTesting + public StreamResultFuture(UUID planId, StreamOperation streamOperation, UUID pendingRepair, PreviewKind previewKind) { this(planId, streamOperation, new StreamCoordinator(streamOperation, 0, streamingFactory(), true, false, pendingRepair, previewKind)); } diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 4e1f3e1..bcf19d5 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -890,6 +890,15 @@ public class StreamSession implements IEndpointStateChangeSubscriber } /** + * Call back on receiving {@code StreamMessage.Type.SESSION_FAILED} message. + */ + public synchronized void sessionTimeout() + { + logger.error("[Stream #{}] timeout with {}.", planId(), peer.toString()); + closeSession(State.FAILED); + } + + /** * @return Current snapshot of this session info. */ public SessionInfo getSessionInfo() diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java index 70ad7d8..980193b 100644 --- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -33,6 +32,7 @@ import com.google.common.base.Throwables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.ScheduledExecutorPlus; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.streaming.messages.OutgoingStreamMessage; @@ -46,7 +46,7 @@ import static org.apache.cassandra.utils.ExecutorUtils.shutdown; public class StreamTransferTask extends StreamTask { private static final Logger logger = LoggerFactory.getLogger(StreamTransferTask.class); - private static final ScheduledExecutorService timeoutExecutor = executorFactory().scheduled("StreamingTransferTaskTimeouts"); + private static final ScheduledExecutorPlus timeoutExecutor = executorFactory().scheduled("StreamingTransferTaskTimeouts"); private final AtomicInteger sequenceNumber = new AtomicInteger(0); private boolean aborted = false; @@ -100,6 +100,26 @@ public class StreamTransferTask extends StreamTask session.taskCompleted(this); } + /** + * Received ACK for stream at {@code sequenceNumber}. + * + * @param sequenceNumber sequence number of stream + */ + public void timeout(int sequenceNumber) + { + synchronized (this) + { + timeoutTasks.remove(sequenceNumber); + OutgoingStreamMessage stream = streams.remove(sequenceNumber); + if (stream == null) return; + stream.complete(); + + logger.debug("timeout sequenceNumber {}, remaining files {}", sequenceNumber, streams.keySet()); + } + + session.sessionTimeout(); + } + public synchronized void abort() { if (aborted) @@ -169,19 +189,7 @@ public class StreamTransferTask extends StreamTask if (!streams.containsKey(sequenceNumber)) return null; - ScheduledFuture future = timeoutExecutor.schedule(new Runnable() - { - public void run() - { - synchronized (StreamTransferTask.this) - { - // remove so we don't cancel ourselves - timeoutTasks.remove(sequenceNumber); - StreamTransferTask.this.complete(sequenceNumber); - } - } - }, time, unit); - + ScheduledFuture future = timeoutExecutor.scheduleTimeoutWithDelay(() -> StreamTransferTask.this.timeout(sequenceNumber), time, unit); ScheduledFuture prev = timeoutTasks.put(sequenceNumber, future); assert prev == null; return future; diff --git a/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java b/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java index 0a7a470..4f3a443 100644 --- a/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java +++ b/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java @@ -59,6 +59,7 @@ import static org.apache.cassandra.config.Config.PROPERTY_PREFIX; import static org.apache.cassandra.streaming.StreamSession.createLogTag; import static org.apache.cassandra.streaming.messages.StreamMessage.serialize; import static org.apache.cassandra.streaming.messages.StreamMessage.serializedSize; +import static org.apache.cassandra.utils.Clock.Global.nanoTime; import static org.apache.cassandra.utils.FBUtilities.getAvailableProcessors; import static org.apache.cassandra.utils.JVMStabilityInspector.inspectThrowable; import static org.apache.cassandra.utils.concurrent.BlockingQueues.newBlockingQueue; diff --git a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java index 079c08b..cd77d4f 100644 --- a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java +++ b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import org.apache.cassandra.config.EncryptionOptions; -import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.OutboundConnectionSettings; import org.apache.cassandra.streaming.StreamingChannel; import org.apache.cassandra.streaming.async.NettyStreamingConnectionFactory; diff --git a/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java b/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java index 8506ce5..3df4314 100644 --- a/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java +++ b/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java @@ -18,12 +18,10 @@ package org.apache.cassandra.utils; import java.io.DataInput; -import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import org.apache.cassandra.db.TypeSizes; -import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.obs.IBitSet; import org.apache.cassandra.utils.obs.OffHeapBitSet; diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index 912d907..58e66ec 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -44,7 +44,7 @@ import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; -import com.google.common.base.Strings; + import org.apache.cassandra.io.util.File; import org.apache.cassandra.utils.concurrent.*; import org.apache.commons.lang3.StringUtils; diff --git a/src/java/org/apache/cassandra/utils/binlog/ExternalArchiver.java b/src/java/org/apache/cassandra/utils/binlog/ExternalArchiver.java index 86b6510..b3ce484 100644 --- a/src/java/org/apache/cassandra/utils/binlog/ExternalArchiver.java +++ b/src/java/org/apache/cassandra/utils/binlog/ExternalArchiver.java @@ -36,7 +36,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; -import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; diff --git a/src/java/org/apache/cassandra/utils/concurrent/Threads.java b/src/java/org/apache/cassandra/utils/concurrent/Threads.java index 439a77f..f02519a 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/Threads.java +++ b/src/java/org/apache/cassandra/utils/concurrent/Threads.java @@ -132,4 +132,9 @@ public class Threads return Stream.of(st).collect(new StackTraceCombiner(printBriefPackages, prefix, delimiter, suffix)); } + public static String prettyPrint(Stream<StackTraceElement> st, boolean printBriefPackages, String prefix, String delimiter, String suffix) + { + return st.collect(new StackTraceCombiner(printBriefPackages, prefix, delimiter, suffix)); + } + } diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java index 8b0550f..ae89594 100644 --- a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java +++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java @@ -18,7 +18,6 @@ package org.apache.cassandra.utils.obs; import java.io.*; -import java.io.IOException; import com.google.common.annotations.VisibleForTesting; diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java index b409c88..2a02d26 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java @@ -111,7 +111,7 @@ public class Coordinator implements ICoordinator null, ProtocolVersion.CURRENT, null), - System.nanoTime()); + nanoTime()); // Collect warnings reported during the query. CoordinatorWarnings.done(); if (res != null) diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Query.java b/test/distributed/org/apache/cassandra/distributed/impl/Query.java index 7950d75..823113f 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Query.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Query.java @@ -35,6 +35,8 @@ import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import static org.apache.cassandra.utils.Clock.Global.nanoTime; + public class Query implements IIsolatedExecutor.SerializableCallable<Object[][]> { private static final long serialVersionUID = 1L; @@ -81,7 +83,7 @@ public class Query implements IIsolatedExecutor.SerializableCallable<Object[][]> null, timestamp, FBUtilities.nowInSeconds()), - System.nanoTime()); + nanoTime()); // Collect warnings reported during the query. if (res != null) diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java index f061e51..8e026fd 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java @@ -95,6 +95,7 @@ public class StreamTransferTaskTest { InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort(); StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, peer, FACTORY, null, current_version, false, 0, UUID.randomUUID(), PreviewKind.ALL); + session.init(new StreamResultFuture(UUID.randomUUID(), StreamOperation.OTHER, UUID.randomUUID(), PreviewKind.NONE)); ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD); // create two sstables --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org