kevinrr888 commented on code in PR #6025:
URL: https://github.com/apache/accumulo/pull/6025#discussion_r2624772211
##########
core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java:
##########
@@ -76,6 +78,7 @@
import com.google.common.base.Preconditions;
+@NotThreadSafe
class RFileScanner extends ScannerOptions implements Scanner {
Review Comment:
Scanners are not thread safe objects
##########
server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java:
##########
@@ -386,6 +388,7 @@ private void releaseReaders(KeyExtent tablet,
List<FileSKVIterator> readers,
}
+ @NotThreadSafe
static class FileDataSource implements DataSource {
Review Comment:
A supporting class for SourceSwitchingIterator which is an iterator which is
not thread safe, and impl shows this is intended for single threaded access
(e.g., no volatile, sync, etc).
##########
server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java:
##########
@@ -67,6 +69,7 @@ public static MetadataTime maxMetadataTime(MetadataTime mv1,
MetadataTime mv2) {
return mv1.compareTo(mv2) < 0 ? mv2 : mv1;
}
+ @NotThreadSafe
static class MillisTime extends TabletTime {
Review Comment:
Unsure if this is supposed to be thread safe... Based on impl it's not, but
unsure if that's by design. LogicalTime does appear to be thread-safe but had a
bug in useMaxTimeFromWALog, fixed here.
##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java:
##########
@@ -355,6 +357,7 @@ private String getTableInfo() {
return context.getPrintableTableInfoFromId(tableId);
}
+ @NotThreadSafe
private class QueryTask implements Runnable {
Review Comment:
QueryTask is not used as/expected to be a thread-safe object
##########
core/src/main/java/org/apache/accumulo/core/crypto/streams/BlockedInputStream.java:
##########
@@ -23,10 +23,13 @@
import java.io.IOException;
import java.io.InputStream;
+import javax.annotation.concurrent.NotThreadSafe;
+
/**
* Reader corresponding to BlockedOutputStream. Expects all data to be in the
form of size (int)
* data (size bytes) junk (however many bytes it takes to complete a block)
*/
+@NotThreadSafe
public class BlockedInputStream extends InputStream {
Review Comment:
BlockedInputStream is a wrapper for DataInputStream, which is not thread safe
##########
core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java:
##########
@@ -111,7 +112,7 @@ static class ReadLock implements Lock {
QueueLock qlock;
byte[] userData;
- long entry = -1;
+ AtomicLong entry;
Review Comment:
This class is expected to be thread safe
##########
core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java:
##########
@@ -23,12 +23,15 @@
import java.io.IOException;
import java.io.InputStream;
+import javax.annotation.concurrent.NotThreadSafe;
+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
/**
* This class is like byte array input stream with two differences. It
supports seeking and avoids
* synchronization.
*/
+@NotThreadSafe
public class SeekableByteArrayInputStream extends InputStream {
Review Comment:
This class is clearly not meant to be thread safe: "It ... avoids
synchronization."
##########
core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerOptions.java:
##########
@@ -46,6 +48,7 @@
import org.apache.accumulo.core.util.TextUtil;
import org.apache.hadoop.io.Text;
+@NotThreadSafe
public class ScannerOptions implements ScannerBase {
Review Comment:
Scanners are not thread safe
##########
core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/SimpleBufferedOutputStream.java:
##########
@@ -22,10 +22,13 @@
import java.io.IOException;
import java.io.OutputStream;
+import javax.annotation.concurrent.NotThreadSafe;
+
/**
* A simplified BufferedOutputStream with borrowed buffer, and allow users to
see how much data have
* been buffered.
*/
+@NotThreadSafe
class SimpleBufferedOutputStream extends FilterOutputStream {
Review Comment:
The fact that this is a stream and the the impl itself show this is not a
thread safe class
##########
core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java:
##########
@@ -43,6 +45,7 @@
public class MapFileOperations extends FileOperations {
private static final String MSG = "Map files are not supported";
+ @NotThreadSafe
public static class RangeIterator implements FileSKVIterator {
Review Comment:
accumulo iterators are not expected to be thread safe
##########
core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java:
##########
@@ -577,6 +579,7 @@ public TransportPoolShutdownException(String msg) {
private static final long serialVersionUID = 1L;
}
+ @NotThreadSafe
private static class CachedTTransport extends TTransport {
Review Comment:
CachedTTransport is only modified by a single thread at a time (must be
reserved()/unreserved())
##########
core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java:
##########
@@ -21,13 +21,16 @@
import java.io.IOException;
import java.io.InputStream;
+import javax.annotation.concurrent.NotThreadSafe;
+
import org.apache.hadoop.fs.Seekable;
/**
* BoundedRangeFIleInputStream abstracts a contiguous region of a Hadoop
FSDataInputStream as a
* regular input stream. One can create multiple BoundedRangeFileInputStream
on top of the same
* FSDataInputStream and they would not interfere with each other.
*/
+@NotThreadSafe
public class BoundedRangeFileInputStream extends InputStream {
Review Comment:
Desc and impl show multiple threads can have their own
BoundedRangeFileInputStream for same underlying data, but not that they can
share same BoundedRangeFileInputStream.
##########
server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java:
##########
@@ -534,6 +536,7 @@ public void delete() {
}
}
+ @NotThreadSafe
private static class NMSKVIter implements InterruptibleIterator {
Review Comment:
Iterators are not thread safe
##########
core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/ColumnFamilySkippingIterator.java:
##########
@@ -36,6 +38,7 @@
import org.apache.accumulo.core.iterators.ServerSkippingIterator;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+@NotThreadSafe
public class ColumnFamilySkippingIterator extends ServerSkippingIterator
implements InterruptibleIterator {
Review Comment:
accumulo iterators are not expected to be thread safe
##########
core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/StatsIterator.java:
##########
@@ -34,24 +33,19 @@
import org.apache.accumulo.core.iterators.ServerWrappingIterator;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+@NotThreadSafe
public class StatsIterator extends ServerWrappingIterator {
Review Comment:
accumulo iterators are not expected to be thread safe. The counters here are
shared, but those are safely atomic. Internal state "numRead" is not and is not
expected to be. deepCopy() shows intended use.
##########
core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java:
##########
@@ -346,6 +348,7 @@ public void close() {
}
}
+ @NotThreadSafe
public static class Reader implements FileSKVIterator {
Review Comment:
accumulo iterators are not expected to be thread safe
##########
core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java:
##########
@@ -754,6 +757,7 @@ public long getLength() {
}
}
+ @NotThreadSafe
private static class LocalityGroupReader extends LocalityGroup implements
FileSKVIterator {
Review Comment:
accumulo iterators are not expected to be thread safe
##########
core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java:
##########
@@ -89,6 +91,7 @@
*/
public class TabletsMetadata implements Iterable<TabletMetadata>,
AutoCloseable {
+ @NotThreadSafe
public static class Builder implements TableRangeOptions, TableOptions,
RangeOptions, Options {
Review Comment:
impl clearly shows that this is not intended to be thread safe (no volatile,
no synchronization)
##########
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java:
##########
@@ -159,6 +161,7 @@ void start(Path bulkDir, Manager manager, long tid, boolean
setTime) throws Exce
abstract long finish() throws Exception;
}
+ @NotThreadSafe
static class OnlineLoader extends Loader {
Review Comment:
OnlineLoader is part of LoadFiles which is a Repo. Repos are executed by a
single thread at a time
##########
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java:
##########
@@ -412,7 +412,7 @@ public int getOpenFiles() {
public static class AssignmentWatcher implements Runnable {
private static final Logger log =
LoggerFactory.getLogger(AssignmentWatcher.class);
- private static long longAssignments = 0;
+ private static volatile long longAssignments = 0;
Review Comment:
Set in a thread, but get in another thread, needs to be volatile
##########
server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java:
##########
@@ -130,14 +130,14 @@ public static void main(String[] args) throws Exception {
}
private final AtomicLong lastRecalc = new AtomicLong(0L);
- private double totalIngestRate = 0.0;
- private double totalQueryRate = 0.0;
- private double totalScanRate = 0.0;
- private long totalEntries = 0L;
- private int totalTabletCount = 0;
- private long totalHoldTime = 0;
- private long totalLookups = 0;
- private int totalTables = 0;
+ private volatile double totalIngestRate = 0.0;
+ private volatile double totalQueryRate = 0.0;
+ private volatile double totalScanRate = 0.0;
+ private volatile long totalEntries = 0L;
+ private volatile int totalTabletCount = 0;
+ private volatile long totalHoldTime = 0;
+ private volatile long totalLookups = 0;
+ private volatile int totalTables = 0;
Review Comment:
These are all set periodically in a Monitor thread, but also retrieved from
other threads so these should be volatile
##########
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java:
##########
@@ -58,10 +59,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-
-import io.opentelemetry.api.trace.Span;
-
+@NotThreadSafe
class ScanDataSource implements DataSource {
Review Comment:
A supporting class for iterators which are not thread safe, and impl shows
this is intended for single threaded access.
##########
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java:
##########
@@ -24,23 +24,22 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.concurrent.NotThreadSafe;
+
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import
org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException;
import org.apache.accumulo.core.iteratorsImpl.system.SourceSwitchingIterator;
-import org.apache.accumulo.core.trace.ScanInstrumentation;
-import org.apache.accumulo.core.trace.TraceUtil;
-import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.ShutdownUtil;
-import org.apache.accumulo.tserver.scan.NextBatchTask;
import org.apache.accumulo.tserver.scan.ScanParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+@NotThreadSafe
public class Scanner {
Review Comment:
Scanners are not thread safe
##########
server/base/src/main/java/org/apache/accumulo/server/compaction/CountingIterator.java:
##########
@@ -23,12 +23,15 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.concurrent.NotThreadSafe;
+
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.WrappingIterator;
+@NotThreadSafe
public class CountingIterator extends WrappingIterator {
Review Comment:
accumulo iterators are not thread safe
##########
server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java:
##########
@@ -531,6 +533,7 @@ public synchronized long getNumEntries() {
private final Set<MemoryIterator> activeIters =
Collections.synchronizedSet(new HashSet<>());
+ @NotThreadSafe
class MemoryDataSource implements DataSource {
Review Comment:
A supporting class for iterators which are not thread safe, and impl shows
this is intended for single threaded access (e.g., no volatile, sync, etc).
##########
server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReportingIterator.java:
##########
@@ -33,6 +35,7 @@
import org.apache.accumulo.core.iteratorsImpl.system.InterruptibleIterator;
import org.apache.accumulo.server.ServerContext;
+@NotThreadSafe
public class ProblemReportingIterator implements InterruptibleIterator {
Review Comment:
accumulo iterators are not thread safe
##########
core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java:
##########
@@ -557,6 +559,7 @@ public void close() throws IOException {
}
}
+ @NotThreadSafe
public static class Writer implements FileSKVWriter {
Review Comment:
impl clearly expects single threaded access (no volatiles or sync (other
than close))
##########
server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java:
##########
@@ -91,6 +93,7 @@
/**
* A single garbage collection performed on a table (Root, MD) or all User
tables.
*/
+@NotThreadSafe
public class GCRun implements GarbageCollectionEnvironment {
Review Comment:
Based on class description and implementation, not thread safe nor intended
to be
##########
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java:
##########
@@ -226,8 +226,8 @@ enum CompactionState {
private final Rate ingestByteRate = new Rate(0.95);
private final Rate scannedRate = new Rate(0.95);
- private long lastMinorCompactionFinishTime = 0;
- private long lastMapFileImportTime = 0;
+ private volatile long lastMinorCompactionFinishTime = 0;
+ private volatile long lastMapFileImportTime = 0;
Review Comment:
lastMinorCompactionFinishTime modified in MinorCompactionTask also read in
MajorCompactor... Needs to be volatile. lastMapFileImportTime read in
MajorCompactor and MinorCompactionTask also modified in at least one other
thread (AssignmentTask)... Needs to be volatile.
##########
test/src/main/java/org/apache/accumulo/test/functional/ErrorThrowingIterator.java:
##########
@@ -37,6 +39,7 @@
* Iterator used in tests *and* the test class must spawn a new MAC instance
for each test since the
* timesThrown variable is static.
*/
+@NotThreadSafe
public class ErrorThrowingIterator extends WrappingIterator {
Review Comment:
accumulo iterators are not thread safe
##########
server/tserver/src/test/java/org/apache/accumulo/tserver/tablet/CompactableImplFileManagerTest.java:
##########
@@ -478,6 +480,7 @@ private void assertNoCandidates(TestFileManager fileMgr,
Set<StoredTabletFile> t
}
+ @NotThreadSafe
static class TestFileManager extends CompactableImpl.FileManager {
Review Comment:
Test class used in single threaded test
##########
test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java:
##########
@@ -248,6 +250,7 @@ public int runTest() throws Exception {
threadPool.shutdown();
}
+ @NotThreadSafe
private abstract static class Test implements Runnable {
Review Comment:
These are updated in this "Test" thread and checked in main thread, but they
aren't checked until execution has completed, so fine not to use volatile or
sync.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]