This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 4550e073d9 IGNITE-21438 Add thread assertions to MV partition and index storages (#3149) 4550e073d9 is described below commit 4550e073d96a30c0adce0785440734f271d1c10e Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Wed Feb 7 13:36:04 2024 +0400 IGNITE-21438 Add thread assertions to MV partition and index storages (#3149) --- .../ignite/internal/thread/IgniteThread.java | 25 +++- .../ignite/internal/thread/ThreadAttributes.java | 40 +++++ .../ignite/internal/thread/ThreadOperation.java | 30 ++++ .../network/netty/NamedNioEventLoopGroup.java | 14 +- .../org/apache/ignite/internal/app/IgniteImpl.java | 33 ++++- modules/storage-api/build.gradle | 1 + .../internal/storage/ThreadAssertingCursor.java | 56 +++++++ .../storage/ThreadAssertingMvPartitionStorage.java | 164 +++++++++++++++++++++ .../ThreadAssertingPartitionTimestampCursor.java | 48 ++++++ .../engine/ThreadAssertingMvTableStorage.java | 162 ++++++++++++++++++++ .../engine/ThreadAssertingStorageEngine.java | 62 ++++++++ .../index/ThreadAssertingHashIndexStorage.java | 51 +++++++ .../storage/index/ThreadAssertingIndexStorage.java | 76 ++++++++++ .../storage/index/ThreadAssertingPeekCursor.java | 47 ++++++ .../index/ThreadAssertingSortedIndexStorage.java | 52 +++++++ .../ignite/internal/worker/ThreadAssertions.java | 75 ++++++++++ 16 files changed, 924 insertions(+), 12 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/thread/IgniteThread.java b/modules/core/src/main/java/org/apache/ignite/internal/thread/IgniteThread.java index 53f2fbf5f3..1801aef277 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/thread/IgniteThread.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/thread/IgniteThread.java @@ -17,6 +17,11 @@ package org.apache.ignite.internal.thread; +import static java.util.Collections.unmodifiableSet; + +import java.util.Collections; +import java.util.EnumSet; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.internal.tostring.S; import org.apache.ignite.internal.util.worker.IgniteWorker; @@ -24,18 +29,20 @@ import org.apache.ignite.internal.util.worker.IgniteWorker; /** * This class adds some necessary plumbing on top of the {@link Thread} class. Specifically, it adds: * <ul> - * <li>Consistent naming of threads</li>; - * <li>Name of the ignite node this thread belongs to</li>. + * <li>Consistent naming of threads;</li> + * <li>Name of the ignite node this thread belongs to.</li> * </ul> * <b>Note</b>: this class is intended for internal use only. */ -public class IgniteThread extends Thread { +public class IgniteThread extends Thread implements ThreadAttributes { /** Number of all ignite threads in the system. */ private static final AtomicLong THREAD_COUNTER = new AtomicLong(); /** The name of the Ignite instance this thread belongs to. */ protected final String igniteInstanceName; + private final Set<ThreadOperation> allowedOperations; + /** * Creates thread with given worker. * @@ -61,11 +68,16 @@ public class IgniteThread extends Thread { * @param nodeName Name of the Ignite instance this thread is created for. * @param threadName Name of thread. * @param r Runnable to execute. + * @param allowedOperations Operations which this thread allows to execute. */ - public IgniteThread(String nodeName, String threadName, Runnable r) { + public IgniteThread(String nodeName, String threadName, Runnable r, ThreadOperation... allowedOperations) { super(r, createName(THREAD_COUNTER.incrementAndGet(), threadName, nodeName)); this.igniteInstanceName = nodeName; + + Set<ThreadOperation> operations = EnumSet.noneOf(ThreadOperation.class); + Collections.addAll(operations, allowedOperations); + this.allowedOperations = unmodifiableSet(operations); } /** @@ -113,4 +125,9 @@ public class IgniteThread extends Thread { public String toString() { return S.toString(IgniteThread.class, this, "name", getName()); } + + @Override + public Set<ThreadOperation> allowedOperations() { + return allowedOperations; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadAttributes.java new file mode 100644 index 0000000000..230095c38b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadAttributes.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.thread; + +import java.util.Set; + +/** + * Holds some thread attributes. + */ +@SuppressWarnings("InterfaceMayBeAnnotatedFunctional") +public interface ThreadAttributes { + /** + * Returns all operations that this thread allows to execute. + */ + Set<ThreadOperation> allowedOperations(); + + /** + * Returns {@code true} if the given operation is allowed by this thread. + * + * @param operation Operation to check. + */ + default boolean allows(ThreadOperation operation) { + return allowedOperations().contains(operation); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadOperation.java new file mode 100644 index 0000000000..62586d4397 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadOperation.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.thread; + +/** + * Operation that a thread might be allowed or denied to execute. + */ +public enum ThreadOperation { + /** Storage read. */ + STORAGE_READ, + /** Storage write. */ + STORAGE_WRITE, + /** Make a blocking wait (involving taking a lock or waiting on a conditional variable or waiting for time to pass. */ + WAIT +} diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NamedNioEventLoopGroup.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NamedNioEventLoopGroup.java index d0853bfda9..2ed0a4691f 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NamedNioEventLoopGroup.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NamedNioEventLoopGroup.java @@ -17,11 +17,16 @@ package org.apache.ignite.internal.network.netty; +import static java.util.Collections.emptySet; + import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.FastThreadLocalThread; +import java.util.Set; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.thread.ThreadAttributes; +import org.apache.ignite.internal.thread.ThreadOperation; /** * Named netty event loop. @@ -59,7 +64,9 @@ public class NamedNioEventLoopGroup extends NioEventLoopGroup { /** * Marker class for network threads. Basically is just a {@link FastThreadLocalThread}. */ - public static class NetworkThread extends FastThreadLocalThread { + public static class NetworkThread extends FastThreadLocalThread implements ThreadAttributes { + private static final Set<ThreadOperation> ALLOWED_OPERATIONS = emptySet(); + /** * Constructor. * @@ -70,5 +77,10 @@ public class NamedNioEventLoopGroup extends NioEventLoopGroup { public NetworkThread(ThreadGroup group, Runnable target, String name) { super(group, target, name); } + + @Override + public Set<ThreadOperation> allowedOperations() { + return ALLOWED_OPERATIONS; + } } } diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index e245959970..ed784318cc 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -27,7 +27,10 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.ServiceLoader; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -157,6 +160,8 @@ import org.apache.ignite.internal.sql.engine.SqlQueryProcessor; import org.apache.ignite.internal.storage.DataStorageManager; import org.apache.ignite.internal.storage.DataStorageModule; import org.apache.ignite.internal.storage.DataStorageModules; +import org.apache.ignite.internal.storage.engine.StorageEngine; +import org.apache.ignite.internal.storage.engine.ThreadAssertingStorageEngine; import org.apache.ignite.internal.systemview.SystemViewManagerImpl; import org.apache.ignite.internal.systemview.api.SystemViewManager; import org.apache.ignite.internal.table.distributed.TableManager; @@ -180,6 +185,7 @@ import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.internal.vault.VaultService; import org.apache.ignite.internal.vault.persistence.PersistentVaultService; import org.apache.ignite.internal.worker.CriticalWorkerWatchdog; +import org.apache.ignite.internal.worker.ThreadAssertions; import org.apache.ignite.internal.worker.configuration.CriticalWorkersConfiguration; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.network.ClusterNode; @@ -553,14 +559,13 @@ public class IgniteImpl implements Ignite { GcConfiguration gcConfig = clusterConfigRegistry.getConfiguration(GcConfiguration.KEY); - dataStorageMgr = new DataStorageManager( - dataStorageModules.createStorageEngines( - name, - nodeConfigRegistry, - storagePath, - longJvmPauseDetector - ) + Map<String, StorageEngine> storageEngines = dataStorageModules.createStorageEngines( + name, + nodeConfigRegistry, + storagePath, + longJvmPauseDetector ); + dataStorageMgr = new DataStorageManager(applyThreadAssertionsIfNeeded(storageEngines)); volatileLogStorageFactoryCreator = new VolatileLogStorageFactoryCreator(workDir.resolve("volatile-log-spillout")); @@ -745,6 +750,20 @@ public class IgniteImpl implements Ignite { restComponent = createRestComponent(name); } + private static Map<String, StorageEngine> applyThreadAssertionsIfNeeded(Map<String, StorageEngine> storageEngines) { + if (!ThreadAssertions.enabled()) { + return storageEngines; + } + + Map<String, StorageEngine> decoratedEngines = new HashMap<>(); + + for (Entry<String, StorageEngine> entry : storageEngines.entrySet()) { + decoratedEngines.put(entry.getKey(), new ThreadAssertingStorageEngine(entry.getValue())); + } + + return Map.copyOf(decoratedEngines); + } + private static SameValueLongSupplier delayDurationMsSupplier(SchemaSynchronizationConfiguration schemaSyncConfig) { return new SameValueLongSupplier(() -> schemaSyncConfig.delayDuration().value()); } diff --git a/modules/storage-api/build.gradle b/modules/storage-api/build.gradle index f64dd596df..5c4a1bc0a2 100644 --- a/modules/storage-api/build.gradle +++ b/modules/storage-api/build.gradle @@ -30,6 +30,7 @@ dependencies { implementation project(':ignite-configuration') implementation project(":ignite-core") implementation project(":ignite-catalog") + implementation project(":ignite-workers") implementation libs.jetbrains.annotations implementation libs.auto.service.annotations diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingCursor.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingCursor.java new file mode 100644 index 0000000000..4f85219b25 --- /dev/null +++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingCursor.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage; + +import static org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToRead; + +import org.apache.ignite.internal.util.Cursor; +import org.apache.ignite.internal.worker.ThreadAssertions; + +/** + * {@link Cursor} that performs thread assertions when doing read operations. + * + * @see ThreadAssertions + */ +public class ThreadAssertingCursor<T> implements Cursor<T> { + private final Cursor<T> cursor; + + /** Constructor. */ + public ThreadAssertingCursor(Cursor<T> cursor) { + this.cursor = cursor; + } + + @Override + public void close() { + cursor.close(); + } + + @Override + public boolean hasNext() { + assertThreadAllowsToRead(); + + return cursor.hasNext(); + } + + @Override + public T next() { + assertThreadAllowsToRead(); + + return cursor.next(); + } +} diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java new file mode 100644 index 0000000000..5fddecd805 --- /dev/null +++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage; + +import static org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToRead; +import static org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToWrite; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.gc.GcEntry; +import org.apache.ignite.internal.util.Cursor; +import org.apache.ignite.internal.worker.ThreadAssertions; +import org.jetbrains.annotations.Nullable; + +/** + * {@link MvPartitionStorage} that performs thread assertions when doing read/write operations. + * + * @see ThreadAssertions + */ +public class ThreadAssertingMvPartitionStorage implements MvPartitionStorage { + private final MvPartitionStorage partitionStorage; + + /** Constructor. */ + public ThreadAssertingMvPartitionStorage(MvPartitionStorage partitionStorage) { + this.partitionStorage = partitionStorage; + } + + @Override + public <V> V runConsistently(WriteClosure<V> closure) throws StorageException { + return partitionStorage.runConsistently(closure); + } + + @Override + public CompletableFuture<Void> flush() { + assertThreadAllowsToWrite(); + + return partitionStorage.flush(); + } + + @Override + public long lastAppliedIndex() { + return partitionStorage.lastAppliedIndex(); + } + + @Override + public long lastAppliedTerm() { + return partitionStorage.lastAppliedTerm(); + } + + @Override + public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) throws StorageException { + partitionStorage.lastApplied(lastAppliedIndex, lastAppliedTerm); + } + + @Override + public byte @Nullable [] committedGroupConfiguration() { + return partitionStorage.committedGroupConfiguration(); + } + + @Override + public void committedGroupConfiguration(byte[] config) { + partitionStorage.committedGroupConfiguration(config); + } + + @Override + public ReadResult read(RowId rowId, HybridTimestamp timestamp) throws StorageException { + assertThreadAllowsToRead(); + + return partitionStorage.read(rowId, timestamp); + } + + @Override + public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, int commitTableId, int commitPartitionId) + throws TxIdMismatchException, StorageException { + assertThreadAllowsToWrite(); + + return partitionStorage.addWrite(rowId, row, txId, commitTableId, commitPartitionId); + } + + @Override + public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException { + assertThreadAllowsToWrite(); + + return partitionStorage.abortWrite(rowId); + } + + @Override + public void commitWrite(RowId rowId, HybridTimestamp timestamp) throws StorageException { + assertThreadAllowsToWrite(); + + partitionStorage.commitWrite(rowId, timestamp); + } + + @Override + public void addWriteCommitted(RowId rowId, @Nullable BinaryRow row, HybridTimestamp commitTimestamp) throws StorageException { + assertThreadAllowsToWrite(); + + partitionStorage.addWriteCommitted(rowId, row, commitTimestamp); + } + + @Override + public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException { + assertThreadAllowsToRead(); + + return new ThreadAssertingCursor<>(partitionStorage.scanVersions(rowId)); + } + + @Override + public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws StorageException { + assertThreadAllowsToRead(); + + return new ThreadAssertingPartitionTimestampCursor(partitionStorage.scan(timestamp)); + } + + @Override + public @Nullable RowId closestRowId(RowId lowerBound) throws StorageException { + assertThreadAllowsToRead(); + + return partitionStorage.closestRowId(lowerBound); + } + + @Override + public @Nullable GcEntry peek(HybridTimestamp lowWatermark) { + assertThreadAllowsToRead(); + + return partitionStorage.peek(lowWatermark); + } + + @Override + public @Nullable BinaryRow vacuum(GcEntry entry) { + assertThreadAllowsToWrite(); + + return partitionStorage.vacuum(entry); + } + + @Override + public long rowsCount() throws StorageException { + assertThreadAllowsToRead(); + + return partitionStorage.rowsCount(); + } + + @Override + public void close() { + partitionStorage.close(); + } +} diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingPartitionTimestampCursor.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingPartitionTimestampCursor.java new file mode 100644 index 0000000000..17a8f32c86 --- /dev/null +++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingPartitionTimestampCursor.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage; + +import static org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToRead; + +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.worker.ThreadAssertions; +import org.jetbrains.annotations.Nullable; + +/** + * {@link PartitionTimestampCursor} that performs thread assertions when doing read operations. + * + * @see ThreadAssertions + */ +public class ThreadAssertingPartitionTimestampCursor extends ThreadAssertingCursor<ReadResult> implements PartitionTimestampCursor { + private final PartitionTimestampCursor cursor; + + /** Constructor. */ + public ThreadAssertingPartitionTimestampCursor(PartitionTimestampCursor cursor) { + super(cursor); + + this.cursor = cursor; + } + + @Override + public @Nullable BinaryRow committed(HybridTimestamp timestamp) { + assertThreadAllowsToRead(); + + return cursor.committed(timestamp); + } +} diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingMvTableStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingMvTableStorage.java new file mode 100644 index 0000000000..97950786df --- /dev/null +++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingMvTableStorage.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.engine; + +import static org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToWrite; + +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.StorageException; +import org.apache.ignite.internal.storage.ThreadAssertingMvPartitionStorage; +import org.apache.ignite.internal.storage.index.HashIndexStorage; +import org.apache.ignite.internal.storage.index.IndexStorage; +import org.apache.ignite.internal.storage.index.SortedIndexStorage; +import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor; +import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor; +import org.apache.ignite.internal.storage.index.ThreadAssertingHashIndexStorage; +import org.apache.ignite.internal.storage.index.ThreadAssertingSortedIndexStorage; +import org.apache.ignite.internal.worker.ThreadAssertions; +import org.jetbrains.annotations.Nullable; + +/** + * {@link MvTableStorage} that performs thread assertions when doing read/write operations and wraps substorages it + * creates to guarantee the same behavior for them. + * + * @see ThreadAssertions + */ +public class ThreadAssertingMvTableStorage implements MvTableStorage { + private final MvTableStorage tableStorage; + + /** Constructor. */ + public ThreadAssertingMvTableStorage(MvTableStorage tableStorage) { + this.tableStorage = tableStorage; + } + + @Override + public void close() throws Exception { + tableStorage.close(); + } + + @Override + public CompletableFuture<MvPartitionStorage> createMvPartition(int partitionId) { + assertThreadAllowsToWrite(); + + return tableStorage.createMvPartition(partitionId) + .thenApply(ThreadAssertingMvPartitionStorage::new); + } + + @Override + public @Nullable MvPartitionStorage getMvPartition(int partitionId) { + return tableStorage.getMvPartition(partitionId); + } + + @Override + public CompletableFuture<Void> destroyPartition(int partitionId) throws StorageException { + assertThreadAllowsToWrite(); + + return tableStorage.destroyPartition(partitionId); + } + + @Override + public SortedIndexStorage getOrCreateSortedIndex(int partitionId, StorageSortedIndexDescriptor indexDescriptor) { + assertThreadAllowsToWrite(); + + SortedIndexStorage indexStorage = tableStorage.getOrCreateSortedIndex(partitionId, indexDescriptor); + return new ThreadAssertingSortedIndexStorage(indexStorage); + } + + @Override + public HashIndexStorage getOrCreateHashIndex(int partitionId, StorageHashIndexDescriptor indexDescriptor) { + assertThreadAllowsToWrite(); + + HashIndexStorage indexStorage = tableStorage.getOrCreateHashIndex(partitionId, indexDescriptor); + return new ThreadAssertingHashIndexStorage(indexStorage); + } + + @Override + public CompletableFuture<Void> destroyIndex(int indexId) { + assertThreadAllowsToWrite(); + + return tableStorage.destroyIndex(indexId); + } + + @Override + public boolean isVolatile() { + return tableStorage.isVolatile(); + } + + @Override + public void start() throws StorageException { + tableStorage.start(); + } + + @Override + public void stop() throws StorageException { + tableStorage.stop(); + } + + @Override + public CompletableFuture<Void> destroy() { + assertThreadAllowsToWrite(); + + return tableStorage.destroy(); + } + + @Override + public CompletableFuture<Void> startRebalancePartition(int partitionId) { + assertThreadAllowsToWrite(); + + return tableStorage.startRebalancePartition(partitionId); + } + + @Override + public CompletableFuture<Void> abortRebalancePartition(int partitionId) { + assertThreadAllowsToWrite(); + + return tableStorage.abortRebalancePartition(partitionId); + } + + @Override + public CompletableFuture<Void> finishRebalancePartition( + int partitionId, + long lastAppliedIndex, + long lastAppliedTerm, + byte[] groupConfig + ) { + assertThreadAllowsToWrite(); + + return tableStorage.finishRebalancePartition(partitionId, lastAppliedIndex, lastAppliedTerm, groupConfig); + } + + @Override + public CompletableFuture<Void> clearPartition(int partitionId) { + assertThreadAllowsToWrite(); + + return tableStorage.clearPartition(partitionId); + } + + @Override + public @Nullable IndexStorage getIndex(int partitionId, int indexId) { + return tableStorage.getIndex(partitionId, indexId); + } + + @Override + public StorageTableDescriptor getTableDescriptor() { + return tableStorage.getTableDescriptor(); + } +} diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingStorageEngine.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingStorageEngine.java new file mode 100644 index 0000000000..d9f02b1773 --- /dev/null +++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingStorageEngine.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.engine; + +import org.apache.ignite.internal.storage.StorageException; +import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier; +import org.apache.ignite.internal.worker.ThreadAssertions; + +/** + * {@link StorageEngine} that wraps storages it creates to perform thread assertions read/write operations. + * + * @see ThreadAssertions + */ +public class ThreadAssertingStorageEngine implements StorageEngine { + private final StorageEngine storageEngine; + + /** Constructor. */ + public ThreadAssertingStorageEngine(StorageEngine storageEngine) { + this.storageEngine = storageEngine; + } + + @Override + public String name() { + return storageEngine.name(); + } + + @Override + public void start() throws StorageException { + storageEngine.start(); + } + + @Override + public void stop() throws StorageException { + storageEngine.stop(); + } + + @Override + public boolean isVolatile() { + return storageEngine.isVolatile(); + } + + @Override + public MvTableStorage createMvTable(StorageTableDescriptor tableDescriptor, StorageIndexDescriptorSupplier indexDescriptorSupplier) { + MvTableStorage tableStorage = storageEngine.createMvTable(tableDescriptor, indexDescriptorSupplier); + return new ThreadAssertingMvTableStorage(tableStorage); + } +} diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingHashIndexStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingHashIndexStorage.java new file mode 100644 index 0000000000..5e1ca534c1 --- /dev/null +++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingHashIndexStorage.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.index; + +import static org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToWrite; + +import org.apache.ignite.internal.storage.StorageException; +import org.apache.ignite.internal.worker.ThreadAssertions; + +/** + * {@link HashIndexStorage} that performs thread assertions when doing read/write operations. + * + * @see ThreadAssertions + */ +public class ThreadAssertingHashIndexStorage extends ThreadAssertingIndexStorage implements HashIndexStorage { + private final HashIndexStorage indexStorage; + + /** Constructor. */ + public ThreadAssertingHashIndexStorage(HashIndexStorage indexStorage) { + super(indexStorage); + + this.indexStorage = indexStorage; + } + + @Override + public StorageHashIndexDescriptor indexDescriptor() { + return indexStorage.indexDescriptor(); + } + + @Override + public void destroy() throws StorageException { + assertThreadAllowsToWrite(); + + indexStorage.destroy(); + } +} diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingIndexStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingIndexStorage.java new file mode 100644 index 0000000000..10560742f3 --- /dev/null +++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingIndexStorage.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.index; + +import static org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToRead; +import static org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToWrite; + +import org.apache.ignite.internal.schema.BinaryTuple; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.storage.StorageException; +import org.apache.ignite.internal.storage.ThreadAssertingCursor; +import org.apache.ignite.internal.util.Cursor; +import org.apache.ignite.internal.worker.ThreadAssertions; +import org.jetbrains.annotations.Nullable; + +/** + * {@link IndexStorage} that performs thread assertions when doing read/write operations. + * + * @see ThreadAssertions + */ +abstract class ThreadAssertingIndexStorage implements IndexStorage { + private final IndexStorage indexStorage; + + /** Constructor. */ + ThreadAssertingIndexStorage(IndexStorage indexStorage) { + this.indexStorage = indexStorage; + } + + @Override + public Cursor<RowId> get(BinaryTuple key) throws StorageException { + assertThreadAllowsToRead(); + + return new ThreadAssertingCursor<>(indexStorage.get(key)); + } + + @Override + public void put(IndexRow row) throws StorageException { + assertThreadAllowsToWrite(); + + indexStorage.put(row); + } + + @Override + public void remove(IndexRow row) throws StorageException { + assertThreadAllowsToWrite(); + + indexStorage.remove(row); + } + + @Override + public @Nullable RowId getNextRowIdToBuild() throws StorageException { + return indexStorage.getNextRowIdToBuild(); + } + + @Override + public void setNextRowIdToBuild(@Nullable RowId rowId) throws StorageException { + assertThreadAllowsToWrite(); + + indexStorage.setNextRowIdToBuild(rowId); + } +} diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingPeekCursor.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingPeekCursor.java new file mode 100644 index 0000000000..8e084b642f --- /dev/null +++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingPeekCursor.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.index; + +import static org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToRead; + +import org.apache.ignite.internal.storage.ThreadAssertingCursor; +import org.apache.ignite.internal.worker.ThreadAssertions; +import org.jetbrains.annotations.Nullable; + +/** + * {@link PeekCursor} that performs thread assertions when doing read operations. + * + * @see ThreadAssertions + */ +public class ThreadAssertingPeekCursor<T> extends ThreadAssertingCursor<T> implements PeekCursor<T> { + private final PeekCursor<T> cursor; + + /** Constructor. */ + public ThreadAssertingPeekCursor(PeekCursor<T> cursor) { + super(cursor); + + this.cursor = cursor; + } + + @Override + public @Nullable T peek() { + assertThreadAllowsToRead(); + + return cursor.peek(); + } +} diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingSortedIndexStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingSortedIndexStorage.java new file mode 100644 index 0000000000..0b61260cfc --- /dev/null +++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingSortedIndexStorage.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.index; + +import static org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToRead; + +import org.apache.ignite.internal.schema.BinaryTuplePrefix; +import org.apache.ignite.internal.worker.ThreadAssertions; +import org.jetbrains.annotations.Nullable; + +/** + * {@link SortedIndexStorage} that performs thread assertions when doing read/write operations. + * + * @see ThreadAssertions + */ +public class ThreadAssertingSortedIndexStorage extends ThreadAssertingIndexStorage implements SortedIndexStorage { + private final SortedIndexStorage indexStorage; + + /** Constructor. */ + public ThreadAssertingSortedIndexStorage(SortedIndexStorage indexStorage) { + super(indexStorage); + + this.indexStorage = indexStorage; + } + + @Override + public StorageSortedIndexDescriptor indexDescriptor() { + return indexStorage.indexDescriptor(); + } + + @Override + public PeekCursor<IndexRow> scan(@Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) { + assertThreadAllowsToRead(); + + return new ThreadAssertingPeekCursor<>(indexStorage.scan(lowerBound, upperBound, flags)); + } +} diff --git a/modules/workers/src/main/java/org/apache/ignite/internal/worker/ThreadAssertions.java b/modules/workers/src/main/java/org/apache/ignite/internal/worker/ThreadAssertions.java new file mode 100644 index 0000000000..64b54c2c35 --- /dev/null +++ b/modules/workers/src/main/java/org/apache/ignite/internal/worker/ThreadAssertions.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.worker; + +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.thread.ThreadAttributes; +import org.apache.ignite.internal.thread.ThreadOperation; + +/** + * Tools to assert that the current thread allows to perform a requested operation. + */ +public class ThreadAssertions { + public static final String ENABLED_PROPERTY = "ignite.thread.assertions.enabled"; + + private static final IgniteLogger LOG = Loggers.forClass(ThreadAssertions.class); + + private static final boolean ENABLED = Boolean.parseBoolean(System.getProperty(ENABLED_PROPERTY, "true")); + + /** + * Returns {@code true} if thread assertions are enabled. + */ + public static boolean enabled() { + return ENABLED; + } + + /** + * Assert that the current thread allows to perform {@link ThreadOperation#STORAGE_WRITE} operations. + */ + public static void assertThreadAllowsToWrite() { + assertThreadAllowsTo(ThreadOperation.STORAGE_WRITE); + } + + /** + * Assert that the current thread allows to perform {@link ThreadOperation#STORAGE_READ} operations. + */ + public static void assertThreadAllowsToRead() { + assertThreadAllowsTo(ThreadOperation.STORAGE_READ); + } + + private static void assertThreadAllowsTo(ThreadOperation requestedOperation) { + Thread currentThread = Thread.currentThread(); + + // TODO: IGNITE-21439 - actually throw AssertionError if the operation is not allowed. + + if (!(currentThread instanceof ThreadAttributes)) { + LOG.warn("Thread {} does not have allowed operations", trackerException(), currentThread); + + return; + } + + if (!((ThreadAttributes) currentThread).allows(requestedOperation)) { + LOG.warn("Thread {} is not allowed to {}", trackerException(), currentThread, requestedOperation); + } + } + + private static Exception trackerException() { + return new Exception("Tracker"); + } +}