This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 4550e073d9 IGNITE-21438 Add thread assertions to MV partition and 
index storages (#3149)
4550e073d9 is described below

commit 4550e073d96a30c0adce0785440734f271d1c10e
Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com>
AuthorDate: Wed Feb 7 13:36:04 2024 +0400

    IGNITE-21438 Add thread assertions to MV partition and index storages 
(#3149)
---
 .../ignite/internal/thread/IgniteThread.java       |  25 +++-
 .../ignite/internal/thread/ThreadAttributes.java   |  40 +++++
 .../ignite/internal/thread/ThreadOperation.java    |  30 ++++
 .../network/netty/NamedNioEventLoopGroup.java      |  14 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  33 ++++-
 modules/storage-api/build.gradle                   |   1 +
 .../internal/storage/ThreadAssertingCursor.java    |  56 +++++++
 .../storage/ThreadAssertingMvPartitionStorage.java | 164 +++++++++++++++++++++
 .../ThreadAssertingPartitionTimestampCursor.java   |  48 ++++++
 .../engine/ThreadAssertingMvTableStorage.java      | 162 ++++++++++++++++++++
 .../engine/ThreadAssertingStorageEngine.java       |  62 ++++++++
 .../index/ThreadAssertingHashIndexStorage.java     |  51 +++++++
 .../storage/index/ThreadAssertingIndexStorage.java |  76 ++++++++++
 .../storage/index/ThreadAssertingPeekCursor.java   |  47 ++++++
 .../index/ThreadAssertingSortedIndexStorage.java   |  52 +++++++
 .../ignite/internal/worker/ThreadAssertions.java   |  75 ++++++++++
 16 files changed, 924 insertions(+), 12 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/thread/IgniteThread.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/IgniteThread.java
index 53f2fbf5f3..1801aef277 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/thread/IgniteThread.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/IgniteThread.java
@@ -17,6 +17,11 @@
 
 package org.apache.ignite.internal.thread;
 
+import static java.util.Collections.unmodifiableSet;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.util.worker.IgniteWorker;
@@ -24,18 +29,20 @@ import org.apache.ignite.internal.util.worker.IgniteWorker;
 /**
  * This class adds some necessary plumbing on top of the {@link Thread} class. 
Specifically, it adds:
  * <ul>
- *      <li>Consistent naming of threads</li>;
- *      <li>Name of the ignite node this thread belongs to</li>.
+ *      <li>Consistent naming of threads;</li>
+ *      <li>Name of the ignite node this thread belongs to.</li>
  * </ul>
  * <b>Note</b>: this class is intended for internal use only.
  */
-public class IgniteThread extends Thread {
+public class IgniteThread extends Thread implements ThreadAttributes {
     /** Number of all ignite threads in the system. */
     private static final AtomicLong THREAD_COUNTER = new AtomicLong();
 
     /** The name of the Ignite instance this thread belongs to. */
     protected final String igniteInstanceName;
 
+    private final Set<ThreadOperation> allowedOperations;
+
     /**
      * Creates thread with given worker.
      *
@@ -61,11 +68,16 @@ public class IgniteThread extends Thread {
      * @param nodeName   Name of the Ignite instance this thread is created 
for.
      * @param threadName Name of thread.
      * @param r          Runnable to execute.
+     * @param allowedOperations Operations which this thread allows to execute.
      */
-    public IgniteThread(String nodeName, String threadName, Runnable r) {
+    public IgniteThread(String nodeName, String threadName, Runnable r, 
ThreadOperation... allowedOperations) {
         super(r, createName(THREAD_COUNTER.incrementAndGet(), threadName, 
nodeName));
 
         this.igniteInstanceName = nodeName;
+
+        Set<ThreadOperation> operations = 
EnumSet.noneOf(ThreadOperation.class);
+        Collections.addAll(operations, allowedOperations);
+        this.allowedOperations = unmodifiableSet(operations);
     }
 
     /**
@@ -113,4 +125,9 @@ public class IgniteThread extends Thread {
     public String toString() {
         return S.toString(IgniteThread.class, this, "name", getName());
     }
+
+    @Override
+    public Set<ThreadOperation> allowedOperations() {
+        return allowedOperations;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadAttributes.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadAttributes.java
new file mode 100644
index 0000000000..230095c38b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadAttributes.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread;
+
+import java.util.Set;
+
+/**
+ * Holds some thread attributes.
+ */
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+public interface ThreadAttributes {
+    /**
+     * Returns all operations that this thread allows to execute.
+     */
+    Set<ThreadOperation> allowedOperations();
+
+    /**
+     * Returns {@code true} if the given operation is allowed by this thread.
+     *
+     * @param operation Operation to check.
+     */
+    default boolean allows(ThreadOperation operation) {
+        return allowedOperations().contains(operation);
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadOperation.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadOperation.java
new file mode 100644
index 0000000000..62586d4397
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadOperation.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread;
+
+/**
+ * Operation that a thread might be allowed or denied to execute.
+ */
+public enum ThreadOperation {
+    /** Storage read. */
+    STORAGE_READ,
+    /** Storage write. */
+    STORAGE_WRITE,
+    /** Make a blocking wait (involving taking a lock or waiting on a 
conditional variable or waiting for time to pass. */
+    WAIT
+}
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NamedNioEventLoopGroup.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NamedNioEventLoopGroup.java
index d0853bfda9..2ed0a4691f 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NamedNioEventLoopGroup.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NamedNioEventLoopGroup.java
@@ -17,11 +17,16 @@
 
 package org.apache.ignite.internal.network.netty;
 
+import static java.util.Collections.emptySet;
+
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.netty.util.concurrent.FastThreadLocalThread;
+import java.util.Set;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.thread.ThreadAttributes;
+import org.apache.ignite.internal.thread.ThreadOperation;
 
 /**
  * Named netty event loop.
@@ -59,7 +64,9 @@ public class NamedNioEventLoopGroup extends NioEventLoopGroup 
{
     /**
      * Marker class for network threads. Basically is just a {@link 
FastThreadLocalThread}.
      */
-    public static class NetworkThread extends FastThreadLocalThread {
+    public static class NetworkThread extends FastThreadLocalThread implements 
ThreadAttributes {
+        private static final Set<ThreadOperation> ALLOWED_OPERATIONS = 
emptySet();
+
         /**
          * Constructor.
          *
@@ -70,5 +77,10 @@ public class NamedNioEventLoopGroup extends 
NioEventLoopGroup {
         public NetworkThread(ThreadGroup group, Runnable target, String name) {
             super(group, target, name);
         }
+
+        @Override
+        public Set<ThreadOperation> allowedOperations() {
+            return ALLOWED_OPERATIONS;
+        }
     }
 }
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index e245959970..ed784318cc 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -27,7 +27,10 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -157,6 +160,8 @@ import 
org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
 import org.apache.ignite.internal.storage.DataStorageManager;
 import org.apache.ignite.internal.storage.DataStorageModule;
 import org.apache.ignite.internal.storage.DataStorageModules;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.storage.engine.ThreadAssertingStorageEngine;
 import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
 import org.apache.ignite.internal.systemview.api.SystemViewManager;
 import org.apache.ignite.internal.table.distributed.TableManager;
@@ -180,6 +185,7 @@ import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.vault.VaultService;
 import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
 import org.apache.ignite.internal.worker.CriticalWorkerWatchdog;
+import org.apache.ignite.internal.worker.ThreadAssertions;
 import 
org.apache.ignite.internal.worker.configuration.CriticalWorkersConfiguration;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterNode;
@@ -553,14 +559,13 @@ public class IgniteImpl implements Ignite {
 
         GcConfiguration gcConfig = 
clusterConfigRegistry.getConfiguration(GcConfiguration.KEY);
 
-        dataStorageMgr = new DataStorageManager(
-                dataStorageModules.createStorageEngines(
-                        name,
-                        nodeConfigRegistry,
-                        storagePath,
-                        longJvmPauseDetector
-                )
+        Map<String, StorageEngine> storageEngines = 
dataStorageModules.createStorageEngines(
+                name,
+                nodeConfigRegistry,
+                storagePath,
+                longJvmPauseDetector
         );
+        dataStorageMgr = new 
DataStorageManager(applyThreadAssertionsIfNeeded(storageEngines));
 
         volatileLogStorageFactoryCreator = new 
VolatileLogStorageFactoryCreator(workDir.resolve("volatile-log-spillout"));
 
@@ -745,6 +750,20 @@ public class IgniteImpl implements Ignite {
         restComponent = createRestComponent(name);
     }
 
+    private static Map<String, StorageEngine> 
applyThreadAssertionsIfNeeded(Map<String, StorageEngine> storageEngines) {
+        if (!ThreadAssertions.enabled()) {
+            return storageEngines;
+        }
+
+        Map<String, StorageEngine> decoratedEngines = new HashMap<>();
+
+        for (Entry<String, StorageEngine> entry : storageEngines.entrySet()) {
+            decoratedEngines.put(entry.getKey(), new 
ThreadAssertingStorageEngine(entry.getValue()));
+        }
+
+        return Map.copyOf(decoratedEngines);
+    }
+
     private static SameValueLongSupplier 
delayDurationMsSupplier(SchemaSynchronizationConfiguration schemaSyncConfig) {
         return new SameValueLongSupplier(() -> 
schemaSyncConfig.delayDuration().value());
     }
diff --git a/modules/storage-api/build.gradle b/modules/storage-api/build.gradle
index f64dd596df..5c4a1bc0a2 100644
--- a/modules/storage-api/build.gradle
+++ b/modules/storage-api/build.gradle
@@ -30,6 +30,7 @@ dependencies {
     implementation project(':ignite-configuration')
     implementation project(":ignite-core")
     implementation project(":ignite-catalog")
+    implementation project(":ignite-workers")
     implementation libs.jetbrains.annotations
     implementation libs.auto.service.annotations
 
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingCursor.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingCursor.java
new file mode 100644
index 0000000000..4f85219b25
--- /dev/null
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingCursor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import static 
org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToRead;
+
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.worker.ThreadAssertions;
+
+/**
+ * {@link Cursor} that performs thread assertions when doing read operations.
+ *
+ * @see ThreadAssertions
+ */
+public class ThreadAssertingCursor<T> implements Cursor<T> {
+    private final Cursor<T> cursor;
+
+    /** Constructor. */
+    public ThreadAssertingCursor(Cursor<T> cursor) {
+        this.cursor = cursor;
+    }
+
+    @Override
+    public void close() {
+        cursor.close();
+    }
+
+    @Override
+    public boolean hasNext() {
+        assertThreadAllowsToRead();
+
+        return cursor.hasNext();
+    }
+
+    @Override
+    public T next() {
+        assertThreadAllowsToRead();
+
+        return cursor.next();
+    }
+}
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
new file mode 100644
index 0000000000..5fddecd805
--- /dev/null
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import static 
org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToRead;
+import static 
org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToWrite;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.gc.GcEntry;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.worker.ThreadAssertions;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link MvPartitionStorage} that performs thread assertions when doing 
read/write operations.
+ *
+ * @see ThreadAssertions
+ */
+public class ThreadAssertingMvPartitionStorage implements MvPartitionStorage {
+    private final MvPartitionStorage partitionStorage;
+
+    /** Constructor. */
+    public ThreadAssertingMvPartitionStorage(MvPartitionStorage 
partitionStorage) {
+        this.partitionStorage = partitionStorage;
+    }
+
+    @Override
+    public <V> V runConsistently(WriteClosure<V> closure) throws 
StorageException {
+        return partitionStorage.runConsistently(closure);
+    }
+
+    @Override
+    public CompletableFuture<Void> flush() {
+        assertThreadAllowsToWrite();
+
+        return partitionStorage.flush();
+    }
+
+    @Override
+    public long lastAppliedIndex() {
+        return partitionStorage.lastAppliedIndex();
+    }
+
+    @Override
+    public long lastAppliedTerm() {
+        return partitionStorage.lastAppliedTerm();
+    }
+
+    @Override
+    public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) 
throws StorageException {
+        partitionStorage.lastApplied(lastAppliedIndex, lastAppliedTerm);
+    }
+
+    @Override
+    public byte @Nullable [] committedGroupConfiguration() {
+        return partitionStorage.committedGroupConfiguration();
+    }
+
+    @Override
+    public void committedGroupConfiguration(byte[] config) {
+        partitionStorage.committedGroupConfiguration(config);
+    }
+
+    @Override
+    public ReadResult read(RowId rowId, HybridTimestamp timestamp) throws 
StorageException {
+        assertThreadAllowsToRead();
+
+        return partitionStorage.read(rowId, timestamp);
+    }
+
+    @Override
+    public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, 
UUID txId, int commitTableId, int commitPartitionId)
+            throws TxIdMismatchException, StorageException {
+        assertThreadAllowsToWrite();
+
+        return partitionStorage.addWrite(rowId, row, txId, commitTableId, 
commitPartitionId);
+    }
+
+    @Override
+    public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException 
{
+        assertThreadAllowsToWrite();
+
+        return partitionStorage.abortWrite(rowId);
+    }
+
+    @Override
+    public void commitWrite(RowId rowId, HybridTimestamp timestamp) throws 
StorageException {
+        assertThreadAllowsToWrite();
+
+        partitionStorage.commitWrite(rowId, timestamp);
+    }
+
+    @Override
+    public void addWriteCommitted(RowId rowId, @Nullable BinaryRow row, 
HybridTimestamp commitTimestamp) throws StorageException {
+        assertThreadAllowsToWrite();
+
+        partitionStorage.addWriteCommitted(rowId, row, commitTimestamp);
+    }
+
+    @Override
+    public Cursor<ReadResult> scanVersions(RowId rowId) throws 
StorageException {
+        assertThreadAllowsToRead();
+
+        return new 
ThreadAssertingCursor<>(partitionStorage.scanVersions(rowId));
+    }
+
+    @Override
+    public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws 
StorageException {
+        assertThreadAllowsToRead();
+
+        return new 
ThreadAssertingPartitionTimestampCursor(partitionStorage.scan(timestamp));
+    }
+
+    @Override
+    public @Nullable RowId closestRowId(RowId lowerBound) throws 
StorageException {
+        assertThreadAllowsToRead();
+
+        return partitionStorage.closestRowId(lowerBound);
+    }
+
+    @Override
+    public @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
+        assertThreadAllowsToRead();
+
+        return partitionStorage.peek(lowWatermark);
+    }
+
+    @Override
+    public @Nullable BinaryRow vacuum(GcEntry entry) {
+        assertThreadAllowsToWrite();
+
+        return partitionStorage.vacuum(entry);
+    }
+
+    @Override
+    public long rowsCount() throws StorageException {
+        assertThreadAllowsToRead();
+
+        return partitionStorage.rowsCount();
+    }
+
+    @Override
+    public void close() {
+        partitionStorage.close();
+    }
+}
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingPartitionTimestampCursor.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingPartitionTimestampCursor.java
new file mode 100644
index 0000000000..17a8f32c86
--- /dev/null
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingPartitionTimestampCursor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import static 
org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToRead;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.worker.ThreadAssertions;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link PartitionTimestampCursor} that performs thread assertions when doing 
read operations.
+ *
+ * @see ThreadAssertions
+ */
+public class ThreadAssertingPartitionTimestampCursor extends 
ThreadAssertingCursor<ReadResult> implements PartitionTimestampCursor {
+    private final PartitionTimestampCursor cursor;
+
+    /** Constructor. */
+    public ThreadAssertingPartitionTimestampCursor(PartitionTimestampCursor 
cursor) {
+        super(cursor);
+
+        this.cursor = cursor;
+    }
+
+    @Override
+    public @Nullable BinaryRow committed(HybridTimestamp timestamp) {
+        assertThreadAllowsToRead();
+
+        return cursor.committed(timestamp);
+    }
+}
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingMvTableStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingMvTableStorage.java
new file mode 100644
index 0000000000..97950786df
--- /dev/null
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingMvTableStorage.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.engine;
+
+import static 
org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToWrite;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.ThreadAssertingMvPartitionStorage;
+import org.apache.ignite.internal.storage.index.HashIndexStorage;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
+import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
+import 
org.apache.ignite.internal.storage.index.ThreadAssertingHashIndexStorage;
+import 
org.apache.ignite.internal.storage.index.ThreadAssertingSortedIndexStorage;
+import org.apache.ignite.internal.worker.ThreadAssertions;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link MvTableStorage} that performs thread assertions when doing 
read/write operations and wraps substorages it
+ * creates to guarantee the same behavior for them.
+ *
+ * @see ThreadAssertions
+ */
+public class ThreadAssertingMvTableStorage implements MvTableStorage {
+    private final MvTableStorage tableStorage;
+
+    /** Constructor. */
+    public ThreadAssertingMvTableStorage(MvTableStorage tableStorage) {
+        this.tableStorage = tableStorage;
+    }
+
+    @Override
+    public void close() throws Exception {
+        tableStorage.close();
+    }
+
+    @Override
+    public CompletableFuture<MvPartitionStorage> createMvPartition(int 
partitionId) {
+        assertThreadAllowsToWrite();
+
+        return tableStorage.createMvPartition(partitionId)
+                .thenApply(ThreadAssertingMvPartitionStorage::new);
+    }
+
+    @Override
+    public @Nullable MvPartitionStorage getMvPartition(int partitionId) {
+        return tableStorage.getMvPartition(partitionId);
+    }
+
+    @Override
+    public CompletableFuture<Void> destroyPartition(int partitionId) throws 
StorageException {
+        assertThreadAllowsToWrite();
+
+        return tableStorage.destroyPartition(partitionId);
+    }
+
+    @Override
+    public SortedIndexStorage getOrCreateSortedIndex(int partitionId, 
StorageSortedIndexDescriptor indexDescriptor) {
+        assertThreadAllowsToWrite();
+
+        SortedIndexStorage indexStorage = 
tableStorage.getOrCreateSortedIndex(partitionId, indexDescriptor);
+        return new ThreadAssertingSortedIndexStorage(indexStorage);
+    }
+
+    @Override
+    public HashIndexStorage getOrCreateHashIndex(int partitionId, 
StorageHashIndexDescriptor indexDescriptor) {
+        assertThreadAllowsToWrite();
+
+        HashIndexStorage indexStorage = 
tableStorage.getOrCreateHashIndex(partitionId, indexDescriptor);
+        return new ThreadAssertingHashIndexStorage(indexStorage);
+    }
+
+    @Override
+    public CompletableFuture<Void> destroyIndex(int indexId) {
+        assertThreadAllowsToWrite();
+
+        return tableStorage.destroyIndex(indexId);
+    }
+
+    @Override
+    public boolean isVolatile() {
+        return tableStorage.isVolatile();
+    }
+
+    @Override
+    public void start() throws StorageException {
+        tableStorage.start();
+    }
+
+    @Override
+    public void stop() throws StorageException {
+        tableStorage.stop();
+    }
+
+    @Override
+    public CompletableFuture<Void> destroy() {
+        assertThreadAllowsToWrite();
+
+        return tableStorage.destroy();
+    }
+
+    @Override
+    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
+        assertThreadAllowsToWrite();
+
+        return tableStorage.startRebalancePartition(partitionId);
+    }
+
+    @Override
+    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
+        assertThreadAllowsToWrite();
+
+        return tableStorage.abortRebalancePartition(partitionId);
+    }
+
+    @Override
+    public CompletableFuture<Void> finishRebalancePartition(
+            int partitionId,
+            long lastAppliedIndex,
+            long lastAppliedTerm,
+            byte[] groupConfig
+    ) {
+        assertThreadAllowsToWrite();
+
+        return tableStorage.finishRebalancePartition(partitionId, 
lastAppliedIndex, lastAppliedTerm, groupConfig);
+    }
+
+    @Override
+    public CompletableFuture<Void> clearPartition(int partitionId) {
+        assertThreadAllowsToWrite();
+
+        return tableStorage.clearPartition(partitionId);
+    }
+
+    @Override
+    public @Nullable IndexStorage getIndex(int partitionId, int indexId) {
+        return tableStorage.getIndex(partitionId, indexId);
+    }
+
+    @Override
+    public StorageTableDescriptor getTableDescriptor() {
+        return tableStorage.getTableDescriptor();
+    }
+}
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingStorageEngine.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingStorageEngine.java
new file mode 100644
index 0000000000..d9f02b1773
--- /dev/null
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingStorageEngine.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.engine;
+
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
+import org.apache.ignite.internal.worker.ThreadAssertions;
+
+/**
+ * {@link StorageEngine} that wraps storages it creates to perform thread 
assertions read/write operations.
+ *
+ * @see ThreadAssertions
+ */
+public class ThreadAssertingStorageEngine implements StorageEngine {
+    private final StorageEngine storageEngine;
+
+    /** Constructor. */
+    public ThreadAssertingStorageEngine(StorageEngine storageEngine) {
+        this.storageEngine = storageEngine;
+    }
+
+    @Override
+    public String name() {
+        return storageEngine.name();
+    }
+
+    @Override
+    public void start() throws StorageException {
+        storageEngine.start();
+    }
+
+    @Override
+    public void stop() throws StorageException {
+        storageEngine.stop();
+    }
+
+    @Override
+    public boolean isVolatile() {
+        return storageEngine.isVolatile();
+    }
+
+    @Override
+    public MvTableStorage createMvTable(StorageTableDescriptor 
tableDescriptor, StorageIndexDescriptorSupplier indexDescriptorSupplier) {
+        MvTableStorage tableStorage = 
storageEngine.createMvTable(tableDescriptor, indexDescriptorSupplier);
+        return new ThreadAssertingMvTableStorage(tableStorage);
+    }
+}
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingHashIndexStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingHashIndexStorage.java
new file mode 100644
index 0000000000..5e1ca534c1
--- /dev/null
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingHashIndexStorage.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.index;
+
+import static 
org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToWrite;
+
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.worker.ThreadAssertions;
+
+/**
+ * {@link HashIndexStorage} that performs thread assertions when doing 
read/write operations.
+ *
+ * @see ThreadAssertions
+ */
+public class ThreadAssertingHashIndexStorage extends 
ThreadAssertingIndexStorage implements HashIndexStorage {
+    private final HashIndexStorage indexStorage;
+
+    /** Constructor. */
+    public ThreadAssertingHashIndexStorage(HashIndexStorage indexStorage) {
+        super(indexStorage);
+
+        this.indexStorage = indexStorage;
+    }
+
+    @Override
+    public StorageHashIndexDescriptor indexDescriptor() {
+        return indexStorage.indexDescriptor();
+    }
+
+    @Override
+    public void destroy() throws StorageException {
+        assertThreadAllowsToWrite();
+
+        indexStorage.destroy();
+    }
+}
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingIndexStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingIndexStorage.java
new file mode 100644
index 0000000000..10560742f3
--- /dev/null
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingIndexStorage.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.index;
+
+import static 
org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToRead;
+import static 
org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToWrite;
+
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.ThreadAssertingCursor;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.worker.ThreadAssertions;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link IndexStorage} that performs thread assertions when doing read/write 
operations.
+ *
+ * @see ThreadAssertions
+ */
+abstract class ThreadAssertingIndexStorage implements IndexStorage {
+    private final IndexStorage indexStorage;
+
+    /** Constructor. */
+    ThreadAssertingIndexStorage(IndexStorage indexStorage) {
+        this.indexStorage = indexStorage;
+    }
+
+    @Override
+    public Cursor<RowId> get(BinaryTuple key) throws StorageException {
+        assertThreadAllowsToRead();
+
+        return new ThreadAssertingCursor<>(indexStorage.get(key));
+    }
+
+    @Override
+    public void put(IndexRow row) throws StorageException {
+        assertThreadAllowsToWrite();
+
+        indexStorage.put(row);
+    }
+
+    @Override
+    public void remove(IndexRow row) throws StorageException {
+        assertThreadAllowsToWrite();
+
+        indexStorage.remove(row);
+    }
+
+    @Override
+    public @Nullable RowId getNextRowIdToBuild() throws StorageException {
+        return indexStorage.getNextRowIdToBuild();
+    }
+
+    @Override
+    public void setNextRowIdToBuild(@Nullable RowId rowId) throws 
StorageException {
+        assertThreadAllowsToWrite();
+
+        indexStorage.setNextRowIdToBuild(rowId);
+    }
+}
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingPeekCursor.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingPeekCursor.java
new file mode 100644
index 0000000000..8e084b642f
--- /dev/null
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingPeekCursor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.index;
+
+import static 
org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToRead;
+
+import org.apache.ignite.internal.storage.ThreadAssertingCursor;
+import org.apache.ignite.internal.worker.ThreadAssertions;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link PeekCursor} that performs thread assertions when doing read 
operations.
+ *
+ * @see ThreadAssertions
+ */
+public class ThreadAssertingPeekCursor<T> extends ThreadAssertingCursor<T> 
implements PeekCursor<T> {
+    private final PeekCursor<T> cursor;
+
+    /** Constructor. */
+    public ThreadAssertingPeekCursor(PeekCursor<T> cursor) {
+        super(cursor);
+
+        this.cursor = cursor;
+    }
+
+    @Override
+    public @Nullable T peek() {
+        assertThreadAllowsToRead();
+
+        return cursor.peek();
+    }
+}
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingSortedIndexStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingSortedIndexStorage.java
new file mode 100644
index 0000000000..0b61260cfc
--- /dev/null
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingSortedIndexStorage.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.index;
+
+import static 
org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToRead;
+
+import org.apache.ignite.internal.schema.BinaryTuplePrefix;
+import org.apache.ignite.internal.worker.ThreadAssertions;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link SortedIndexStorage} that performs thread assertions when doing 
read/write operations.
+ *
+ * @see ThreadAssertions
+ */
+public class ThreadAssertingSortedIndexStorage extends 
ThreadAssertingIndexStorage implements SortedIndexStorage {
+    private final SortedIndexStorage indexStorage;
+
+    /** Constructor. */
+    public ThreadAssertingSortedIndexStorage(SortedIndexStorage indexStorage) {
+        super(indexStorage);
+
+        this.indexStorage = indexStorage;
+    }
+
+    @Override
+    public StorageSortedIndexDescriptor indexDescriptor() {
+        return indexStorage.indexDescriptor();
+    }
+
+    @Override
+    public PeekCursor<IndexRow> scan(@Nullable BinaryTuplePrefix lowerBound, 
@Nullable BinaryTuplePrefix upperBound, int flags) {
+        assertThreadAllowsToRead();
+
+        return new ThreadAssertingPeekCursor<>(indexStorage.scan(lowerBound, 
upperBound, flags));
+    }
+}
diff --git 
a/modules/workers/src/main/java/org/apache/ignite/internal/worker/ThreadAssertions.java
 
b/modules/workers/src/main/java/org/apache/ignite/internal/worker/ThreadAssertions.java
new file mode 100644
index 0000000000..64b54c2c35
--- /dev/null
+++ 
b/modules/workers/src/main/java/org/apache/ignite/internal/worker/ThreadAssertions.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.worker;
+
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.ThreadAttributes;
+import org.apache.ignite.internal.thread.ThreadOperation;
+
+/**
+ * Tools to assert that the current thread allows to perform a requested 
operation.
+ */
+public class ThreadAssertions {
+    public static final String ENABLED_PROPERTY = 
"ignite.thread.assertions.enabled";
+
+    private static final IgniteLogger LOG = 
Loggers.forClass(ThreadAssertions.class);
+
+    private static final boolean ENABLED = 
Boolean.parseBoolean(System.getProperty(ENABLED_PROPERTY, "true"));
+
+    /**
+     * Returns {@code true} if thread assertions are enabled.
+     */
+    public static boolean enabled() {
+        return ENABLED;
+    }
+
+    /**
+     * Assert that the current thread allows to perform {@link 
ThreadOperation#STORAGE_WRITE} operations.
+     */
+    public static void assertThreadAllowsToWrite() {
+        assertThreadAllowsTo(ThreadOperation.STORAGE_WRITE);
+    }
+
+    /**
+     * Assert that the current thread allows to perform {@link 
ThreadOperation#STORAGE_READ} operations.
+     */
+    public static void assertThreadAllowsToRead() {
+        assertThreadAllowsTo(ThreadOperation.STORAGE_READ);
+    }
+
+    private static void assertThreadAllowsTo(ThreadOperation 
requestedOperation) {
+        Thread currentThread = Thread.currentThread();
+
+        // TODO: IGNITE-21439 - actually throw AssertionError if the operation 
is not allowed.
+
+        if (!(currentThread instanceof ThreadAttributes)) {
+            LOG.warn("Thread {} does not have allowed operations", 
trackerException(), currentThread);
+
+            return;
+        }
+
+        if (!((ThreadAttributes) currentThread).allows(requestedOperation)) {
+            LOG.warn("Thread {} is not allowed to {}", trackerException(), 
currentThread, requestedOperation);
+        }
+    }
+
+    private static Exception trackerException() {
+        return new Exception("Tracker");
+    }
+}


Reply via email to