This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 11ee199daa IGNITE-18539 Implement build procedure for new indexes 
(#1800)
11ee199daa is described below

commit 11ee199daabd4cdc2c450b4de96e0239190c81dd
Author: Kirill Tkalenko <tkalkir...@yandex.ru>
AuthorDate: Fri Mar 31 14:12:15 2023 +0300

    IGNITE-18539 Implement build procedure for new indexes (#1800)
---
 modules/index/build.gradle                         |   3 +
 .../apache/ignite/internal/index/IndexBuilder.java | 274 +++++++++++++++++++++
 .../apache/ignite/internal/index/IndexManager.java |  19 +-
 .../ignite/internal/index/IndexManagerTest.java    |   3 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   2 +-
 .../internal/sql/engine/ItBuildIndexTest.java      | 143 +++++++++++
 .../org/apache/ignite/internal/app/IgniteImpl.java |   2 +-
 .../sql/engine/exec/MockedStructuresTest.java      |   2 +-
 .../ignite/distributed/ItTablePersistenceTest.java |   9 +-
 .../distributed/ItTxDistributedTestSingleNode.java |  11 +-
 .../apache/ignite/internal/table/TableImpl.java    |  42 +++-
 .../table/distributed/StorageUpdateHandler.java    |  43 +++-
 .../distributed/TableIndexStoragesSupplier.java    |  40 +++
 .../table/distributed/TableMessageGroup.java       |   4 +
 .../distributed/command/BuildIndexCommand.java     |  50 ++++
 .../table/distributed/raft/PartitionListener.java  |  32 +++
 .../internal/table/distributed/IndexBaseTest.java  |  17 +-
 .../PartitionGcOnWriteConcurrentTest.java          |   9 +-
 .../table/distributed/PartitionGcOnWriteTest.java  |  10 +-
 .../distributed/StorageUpdateHandlerTest.java      | 132 ++++++++++
 .../raft/PartitionCommandListenerTest.java         |  93 +++++--
 .../PartitionReplicaListenerIndexLockingTest.java  |   3 +-
 .../replication/PartitionReplicaListenerTest.java  |   3 +-
 .../table/impl/DummyInternalTableImpl.java         |  24 +-
 24 files changed, 909 insertions(+), 61 deletions(-)

diff --git a/modules/index/build.gradle b/modules/index/build.gradle
index c104b76510..33c33510ba 100644
--- a/modules/index/build.gradle
+++ b/modules/index/build.gradle
@@ -26,6 +26,9 @@ dependencies {
     implementation project(':ignite-schema')
     implementation project(':ignite-table')
     implementation project(':ignite-transactions')
+    implementation project(':ignite-storage-api')
+    implementation project(':ignite-network-api')
+    implementation project(':ignite-raft-api')
     implementation libs.jetbrains.annotations
 
     testImplementation(testFixtures(project(':ignite-configuration')))
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
new file mode 100644
index 0000000000..15364e5298
--- /dev/null
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.network.ClusterService;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class for managing the index building process.
+ */
+class IndexBuilder {
+    private static final IgniteLogger LOG = 
Loggers.forClass(IndexBuilder.class);
+
+    /** Batch size of row IDs to build the index. */
+    private static final int BUILD_INDEX_ROW_ID_BATCH_SIZE = 100;
+
+    /** Message factory to create messages - RAFT commands. */
+    private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new 
TableMessagesFactory();
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock;
+
+    /** Cluster service. */
+    private final ClusterService clusterService;
+
+    /** Index building executor. */
+    private final ExecutorService buildIndexExecutor;
+
+    IndexBuilder(String nodeName, IgniteSpinBusyLock busyLock, ClusterService 
clusterService) {
+        this.busyLock = busyLock;
+        this.clusterService = clusterService;
+
+        int cpus = Runtime.getRuntime().availableProcessors();
+
+        buildIndexExecutor = new ThreadPoolExecutor(
+                cpus,
+                cpus,
+                30,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                NamedThreadFactory.create(nodeName, "build-index", LOG)
+        );
+    }
+
+    /**
+     * Stops the index builder.
+     */
+    void stop() {
+        shutdownAndAwaitTermination(buildIndexExecutor, 10, TimeUnit.SECONDS);
+    }
+
+    /**
+     * Initializes the build of the index.
+     */
+    void startIndexBuild(TableIndexView tableIndexView, TableImpl table) {
+        for (int partitionId = 0; partitionId < 
table.internalTable().partitions(); partitionId++) {
+            // TODO: IGNITE-19177 Add assignments check
+            buildIndexExecutor.submit(new BuildIndexTask(table, 
tableIndexView, partitionId, null));
+        }
+    }
+
+    /**
+     * Task of building a table index for a partition.
+     *
+     * <p>Only the leader of the raft group will manage the building of the 
index. Leader sends batches of row IDs via
+     * {@link BuildIndexCommand}, the next batch will only be send after the 
previous batch has been processed.
+     *
+     * <p>Index building itself occurs locally on each node of the raft group 
when processing {@link BuildIndexCommand}. This ensures that
+     * the index build process in the raft group is consistent and that the 
index build process is restored after restarting the raft group
+     * (not from the beginning).
+     */
+    private class BuildIndexTask implements Runnable {
+        private final TableImpl table;
+
+        private final TableIndexView tableIndexView;
+
+        private final int partitionId;
+
+        /**
+         * ID of the next row to build the index from the previous batch, 
{@code null} if it is the first row after the index was created
+         * (both on a live node and after a restore).
+         */
+        private final @Nullable RowId nextRowIdToBuildFromPreviousBatch;
+
+        private BuildIndexTask(
+                TableImpl table,
+                TableIndexView tableIndexView,
+                int partitionId,
+                @Nullable RowId nextRowIdToBuildFromPreviousBatch
+        ) {
+            this.table = table;
+            this.tableIndexView = tableIndexView;
+            this.partitionId = partitionId;
+            this.nextRowIdToBuildFromPreviousBatch = 
nextRowIdToBuildFromPreviousBatch;
+        }
+
+        @Override
+        public void run() {
+            if (!busyLock.enterBusy()) {
+                return;
+            }
+
+            try {
+                // At the time of creating the index, we should have already 
waited for the table to be created and its raft of clients
+                // (services) to start for all partitions, so there should be 
no errors.
+                RaftGroupService raftGroupService = 
table.internalTable().partitionRaftGroupService(partitionId);
+
+                raftGroupService
+                        // We do not check the presence of nodes in the 
topology on purpose, so as not to get into races on
+                        // rebalancing, it will be more convenient and 
reliable for us to wait for a stable topology with a chosen
+                        // leader.
+                        .refreshAndGetLeaderWithTerm()
+                        .thenComposeAsync(leaderWithTerm -> {
+                            if (!busyLock.enterBusy()) {
+                                return completedFuture(null);
+                            }
+
+                            try {
+                                // At this point, we have a stable topology, 
each node of which has already applied all local updates.
+                                if 
(!localNodeConsistentId().equals(leaderWithTerm.leader().consistentId())) {
+                                    // TODO: IGNITE-19053 Must handle the 
change of leader
+                                    // TODO: IGNITE-19053 Add a test to change 
the leader even at the start of the task
+                                    return completedFuture(null);
+                                }
+
+                                List<RowId> batchRowIds = collectRowIdBatch();
+
+                                RowId nextRowId = 
getNextRowIdForNextBatch(batchRowIds);
+
+                                boolean finish = batchRowIds.size() < 
BUILD_INDEX_ROW_ID_BATCH_SIZE || nextRowId == null;
+
+                                // TODO: IGNITE-19053 Must handle the change 
of leader
+                                return 
raftGroupService.run(createBuildIndexCommand(batchRowIds, finish))
+                                        .thenRun(() -> {
+                                            if (!finish) {
+                                                assert nextRowId != null : 
createCommonTableIndexInfo();
+
+                                                buildIndexExecutor.submit(
+                                                        new 
BuildIndexTask(table, tableIndexView, partitionId, nextRowId)
+                                                );
+                                            }
+                                        });
+                            } finally {
+                                busyLock.leaveBusy();
+                            }
+                        }, buildIndexExecutor)
+                        .whenComplete((unused, throwable) -> {
+                            if (throwable != null) {
+                                LOG.error("Index build error: [{}]", 
throwable, createCommonTableIndexInfo());
+                            }
+                        });
+            } catch (Throwable t) {
+                LOG.error("Index build error: [{}]", t, 
createCommonTableIndexInfo());
+            } finally {
+                busyLock.leaveBusy();
+            }
+        }
+
+        private boolean isLocalNodeLeader(RaftGroupService raftGroupService) {
+            Peer leader = raftGroupService.leader();
+
+            assert leader != null : "tableId=" + table.tableId() + ", 
partitionId=" + partitionId;
+
+            return localNodeConsistentId().equals(leader.consistentId());
+        }
+
+        private List<RowId> createBatchRowIds(RowId lastBuiltRowId, int 
batchSize) {
+            MvPartitionStorage mvPartition = 
table.internalTable().storage().getMvPartition(partitionId);
+
+            assert mvPartition != null : createCommonTableIndexInfo();
+
+            List<RowId> batch = new ArrayList<>(batchSize);
+
+            for (int i = 0; i < batchSize && lastBuiltRowId != null; i++) {
+                lastBuiltRowId = mvPartition.closestRowId(lastBuiltRowId);
+
+                if (lastBuiltRowId == null) {
+                    break;
+                }
+
+                batch.add(lastBuiltRowId);
+
+                lastBuiltRowId = lastBuiltRowId.increment();
+            }
+
+            return batch;
+        }
+
+        private BuildIndexCommand createBuildIndexCommand(List<RowId> rowIds, 
boolean finish) {
+            return TABLE_MESSAGES_FACTORY.buildIndexCommand()
+                    
.tablePartitionId(TABLE_MESSAGES_FACTORY.tablePartitionIdMessage()
+                            .tableId(table.tableId())
+                            .partitionId(partitionId)
+                            .build()
+                    )
+                    .indexId(tableIndexView.id())
+                    .rowIds(rowIds.stream().map(RowId::uuid).collect(toList()))
+                    .finish(finish)
+                    .build();
+        }
+
+        private String createCommonTableIndexInfo() {
+            return "table=" + table.name() + ", tableId=" + table.tableId()
+                    + ", partitionId=" + partitionId
+                    + ", index=" + tableIndexView.name() + ", indexId=" + 
tableIndexView.id();
+        }
+
+        private String localNodeConsistentId() {
+            return clusterService.topologyService().localMember().name();
+        }
+
+        private @Nullable RowId getNextRowIdForNextBatch(List<RowId> batch) {
+            return batch.isEmpty() ? null : batch.get(batch.size() - 
1).increment();
+        }
+
+        private @Nullable List<RowId> collectRowIdBatch() {
+            RowId nextRowIdToBuild;
+
+            if (nextRowIdToBuildFromPreviousBatch == null) {
+                nextRowIdToBuild = 
table.internalTable().storage().getOrCreateIndex(partitionId, 
tableIndexView.id()).getNextRowIdToBuild();
+
+                if (nextRowIdToBuild == null) {
+                    // Index has already been built.
+                    return null;
+                }
+            } else {
+                nextRowIdToBuild = nextRowIdToBuildFromPreviousBatch;
+            }
+
+            if (nextRowIdToBuildFromPreviousBatch == null) {
+                LOG.info("Start building the index: [{}]", 
createCommonTableIndexInfo());
+            }
+
+            return createBatchRowIds(nextRowIdToBuild, 
BUILD_INDEX_ROW_ID_BATCH_SIZE);
+        }
+    }
+}
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index ed514ba196..4b0caa0ccf 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -67,6 +67,7 @@ import org.apache.ignite.lang.IndexAlreadyExistsException;
 import org.apache.ignite.lang.IndexNotFoundException;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.lang.TableNotFoundException;
+import org.apache.ignite.network.ClusterService;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -92,17 +93,29 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
     /** Prevents double stopping of the component. */
     private final AtomicBoolean stopGuard = new AtomicBoolean();
 
+    /** Index builder. */
+    private final IndexBuilder indexBuilder;
+
     /**
      * Constructor.
      *
+     * @param nodeName Node name.
      * @param tablesCfg Tables and indexes configuration.
      * @param schemaManager Schema manager.
      * @param tableManager Table manager.
+     * @param clusterService Cluster service.
      */
-    public IndexManager(TablesConfiguration tablesCfg, SchemaManager 
schemaManager, TableManager tableManager) {
+    public IndexManager(
+            String nodeName,
+            TablesConfiguration tablesCfg,
+            SchemaManager schemaManager,
+            TableManager tableManager,
+            ClusterService clusterService
+    ) {
         this.tablesCfg = Objects.requireNonNull(tablesCfg, "tablesCfg");
         this.schemaManager = Objects.requireNonNull(schemaManager, 
"schemaManager");
         this.tableManager = tableManager;
+        this.indexBuilder = new IndexBuilder(nodeName, busyLock, 
clusterService);
     }
 
     /** {@inheritDoc} */
@@ -159,6 +172,8 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
 
         busyLock.block();
 
+        indexBuilder.stop();
+
         LOG.info("Index manager stopped");
     }
 
@@ -418,6 +433,8 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
                     table.pkId(indexId);
                 }
             }
+
+            indexBuilder.startIndexBuild(tableIndexView, table);
         });
 
         return allOf(createIndexFuture, fireEventFuture);
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index 99e8ff769b..0be0aef1ed 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -54,6 +54,7 @@ import 
org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IndexNotFoundException;
+import org.apache.ignite.network.ClusterService;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -92,7 +93,7 @@ public class IndexManagerTest {
 
         when(schManager.schemaRegistry(anyLong(), 
any())).thenReturn(completedFuture(null));
 
-        indexManager = new IndexManager(tablesConfig, schManager, 
tableManagerMock);
+        indexManager = new IndexManager("test", tablesConfig, schManager, 
tableManagerMock, mock(ClusterService.class));
         indexManager.start();
 
         assertThat(
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index b17823bb1c..94ab5b0fd5 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -391,7 +391,7 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
                 topologyAwareRaftGroupServiceFactory
         );
 
-        var indexManager = new IndexManager(tablesConfiguration, 
schemaManager, tableManager);
+        var indexManager = new IndexManager(name, tablesConfiguration, 
schemaManager, tableManager, clusterSvc);
 
         CatalogManager catalogManager = new CatalogServiceImpl(metaStorageMgr);
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
new file mode 100644
index 0000000000..3ef6863171
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static java.util.stream.Collectors.joining;
+import static 
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.table.Table;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Integration test of index building.
+ */
+public class ItBuildIndexTest extends ClusterPerClassIntegrationTest {
+    private static final String ZONE_NAME = "zone_table";
+
+    private static final String TABLE_NAME = "test_table";
+
+    private static final String INDEX_NAME = "test_index";
+
+    @AfterEach
+    void tearDown() {
+        sql("DROP TABLE IF EXISTS " + TABLE_NAME);
+    }
+
+    @ParameterizedTest
+    @MethodSource("replicas")
+    void testBuildIndexOnStableTopology(int replicas) throws Exception {
+        sql(IgniteStringFormatter.format("CREATE ZONE IF NOT EXISTS {} WITH 
REPLICAS={}, PARTITIONS={}",
+                ZONE_NAME, replicas, 2
+        ));
+
+        sql(IgniteStringFormatter.format(
+                "CREATE TABLE {} (i0 INTEGER PRIMARY KEY, i1 INTEGER) WITH 
PRIMARY_ZONE='{}'",
+                TABLE_NAME, ZONE_NAME.toUpperCase()
+        ));
+
+        sql(IgniteStringFormatter.format(
+                "INSERT INTO {} VALUES {}",
+                TABLE_NAME, toValuesString(List.of(1, 1), List.of(2, 2), 
List.of(3, 3), List.of(4, 4), List.of(5, 5))
+        ));
+
+        sql(IgniteStringFormatter.format("CREATE INDEX {} ON {} (i1)", 
INDEX_NAME, TABLE_NAME));
+
+        // TODO: IGNITE-18733
+        waitForIndex(INDEX_NAME);
+
+        waitForIndexBuild(TABLE_NAME, INDEX_NAME);
+
+        assertQuery(IgniteStringFormatter.format("SELECT * FROM {} WHERE i1 > 
0", TABLE_NAME))
+                .matches(containsIndexScan("PUBLIC", TABLE_NAME.toUpperCase(), 
INDEX_NAME.toUpperCase()))
+                .returns(1, 1)
+                .returns(2, 2)
+                .returns(3, 3)
+                .returns(4, 4)
+                .returns(5, 5)
+                .check();
+    }
+
+    private static int[] replicas() {
+        // TODO: IGNITE-19086 Fix NullPointerException on insertAll
+        //        return new int[]{1, 2, 3};
+        return new int[]{1};
+    }
+
+    private static String toValuesString(List<Object>... values) {
+        return Stream.of(values)
+                .peek(Assertions::assertNotNull)
+                .map(objects -> 
objects.stream().map(Object::toString).collect(joining(", ", "(", ")")))
+                .collect(joining(", "));
+    }
+
+    private void waitForIndexBuild(String tableName, String indexName) throws 
Exception {
+        for (Ignite clusterNode : CLUSTER_NODES) {
+            CompletableFuture<Table> tableFuture = 
clusterNode.tables().tableAsync(tableName);
+
+            assertThat(tableFuture, willCompleteSuccessfully());
+
+            TableImpl tableImpl = (TableImpl) tableFuture.join();
+
+            InternalTable internalTable = tableImpl.internalTable();
+
+            UUID indexId = ((IgniteImpl) clusterNode).clusterConfiguration()
+                    .getConfiguration(TablesConfiguration.KEY)
+                    .indexes()
+                    .get(INDEX_NAME.toUpperCase())
+                    .id()
+                    .value();
+
+            assertNotNull(indexId, "table=" + tableName + ", index=" + 
indexName);
+
+            for (int partitionId = 0; partitionId < 
internalTable.partitions(); partitionId++) {
+                RaftGroupService raftGroupService = 
internalTable.partitionRaftGroupService(partitionId);
+
+                Stream<Peer> allPeers = 
Stream.concat(Stream.of(raftGroupService.leader()), 
raftGroupService.peers().stream());
+
+                if 
(allPeers.map(Peer::consistentId).noneMatch(clusterNode.name()::equals)) {
+                    continue;
+                }
+
+                IndexStorage index = 
internalTable.storage().getOrCreateIndex(partitionId, indexId);
+
+                assertTrue(waitForCondition(() -> index.getNextRowIdToBuild() 
== null, 10, 10_000));
+            }
+        }
+    }
+}
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index dc715c2d12..76087f301e 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -486,7 +486,7 @@ public class IgniteImpl implements Ignite {
                 topologyAwareRaftGroupServiceFactory
         );
 
-        indexManager = new IndexManager(tablesConfiguration, schemaManager, 
distributedTblMgr);
+        indexManager = new IndexManager(name, tablesConfiguration, 
schemaManager, distributedTblMgr, clusterSvc);
 
         catalogManager = new CatalogServiceImpl(metaStorageMgr);
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 0ef110270d..06253e8b62 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -249,7 +249,7 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
 
         tblManager = mockManagers();
 
-        idxManager = new IndexManager(tblsCfg, schemaManager, tblManager);
+        idxManager = new IndexManager(NODE_NAME, tblsCfg, schemaManager, 
tblManager, cs);
 
         idxManager.start();
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index 7f0977a79a..667016c8a5 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -82,6 +82,7 @@ import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaL
 import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
 import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
@@ -375,8 +376,12 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
 
                     PartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(mvPartitionStorage);
 
-                    StorageUpdateHandler storageUpdateHandler =
-                            new StorageUpdateHandler(0, partitionDataStorage, 
Map::of, tableCfg.dataStorage());
+                    StorageUpdateHandler storageUpdateHandler = new 
StorageUpdateHandler(
+                            0,
+                            partitionDataStorage,
+                            
DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of()),
+                            tableCfg.dataStorage()
+                    );
 
                     PartitionListener listener = new PartitionListener(
                             partitionDataStorage,
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 93d6e253af..e9e3152a8d 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -41,7 +41,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
-import java.util.function.Supplier;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.affinity.AffinityUtils;
 import org.apache.ignite.internal.affinity.Assignment;
@@ -90,6 +89,7 @@ import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaL
 import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
 import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.tx.InternalTransaction;
@@ -440,8 +440,13 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
                 PendingComparableValuesTracker<Long> storageIndexTracker = new 
PendingComparableValuesTracker<>(0L);
 
                 PartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(testMpPartStorage);
-                Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () 
-> Map.of(pkStorage.get().id(), pkStorage.get());
-                StorageUpdateHandler storageUpdateHandler = new 
StorageUpdateHandler(partId, partitionDataStorage, indexes, dsCfg);
+
+                StorageUpdateHandler storageUpdateHandler = new 
StorageUpdateHandler(
+                        partId,
+                        partitionDataStorage,
+                        
DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of(pkStorage.get().id(),
 pkStorage.get())),
+                        dsCfg
+                );
 
                 TopologyAwareRaftGroupServiceFactory 
topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
                         clusterServices.get(assignment),
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index 560c0b0543..1e406f54c6 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.table.distributed.HashIndexLocker;
 import org.apache.ignite.internal.table.distributed.IndexLocker;
 import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
+import org.apache.ignite.internal.table.distributed.TableIndexStoragesSupplier;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.lang.ErrorGroups;
@@ -222,20 +223,28 @@ public class TableImpl implements Table {
     }
 
     /** Returns a supplier of index storage wrapper factories for given 
partition. */
-    public Supplier<Map<UUID, TableSchemaAwareIndexStorage>> 
indexStorageAdapters(int partId) {
-        return () -> {
-            awaitIndexes();
+    public TableIndexStoragesSupplier indexStorageAdapters(int partId) {
+        return new TableIndexStoragesSupplier() {
+            @Override
+            public Map<UUID, TableSchemaAwareIndexStorage> get() {
+                awaitIndexes();
 
-            List<IndexStorageAdapterFactory> factories = new 
ArrayList<>(indexStorageAdapterFactories.values());
+                List<IndexStorageAdapterFactory> factories = new 
ArrayList<>(indexStorageAdapterFactories.values());
 
-            Map<UUID, TableSchemaAwareIndexStorage> adapters = new HashMap<>();
+                Map<UUID, TableSchemaAwareIndexStorage> adapters = new 
HashMap<>();
 
-            for (IndexStorageAdapterFactory factory : factories) {
-                TableSchemaAwareIndexStorage storage = factory.create(partId);
-                adapters.put(storage.id(), storage);
+                for (IndexStorageAdapterFactory factory : factories) {
+                    TableSchemaAwareIndexStorage storage = 
factory.create(partId);
+                    adapters.put(storage.id(), storage);
+                }
+
+                return adapters;
             }
 
-            return adapters;
+            @Override
+            public void addIndexToWaitIfAbsent(UUID indexId) {
+                addIndexesToWait(List.of(indexId));
+            }
         };
     }
 
@@ -385,14 +394,25 @@ public class TableImpl implements Table {
     }
 
     /**
-     * Adds indexes to wait before inserting data into the table.
+     * Adds indexes to wait, if not already created, before inserting data 
into the table.
      *
      * @param indexIds Indexes Index IDs.
      */
     // TODO: IGNITE-19082 Needs to be redone/improved
     public void addIndexesToWait(Collection<UUID> indexIds) {
         for (UUID indexId : indexIds) {
-            indexesToWait.computeIfAbsent(indexId, uuid -> new 
CompletableFuture<>());
+            indexesToWait.compute(indexId, (indexId0, awaitIndexFuture) -> {
+                if (awaitIndexFuture != null) {
+                    return awaitIndexFuture;
+                }
+
+                if (indexStorageAdapterFactories.containsKey(indexId) && 
indexLockerFactories.containsKey(indexId)) {
+                    // Index is already registered and created.
+                    return null;
+                }
+
+                return new CompletableFuture<>();
+            });
         }
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index b842038cae..de76d70584 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -28,7 +28,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
-import java.util.function.Supplier;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryTuple;
@@ -54,7 +53,7 @@ public class StorageUpdateHandler {
     /** Partition storage with access to MV data of a partition. */
     private final PartitionDataStorage storage;
 
-    private final Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes;
+    private final TableIndexStoragesSupplier indexes;
 
     /** Last recorded GC low watermark. */
     private final AtomicReference<HybridTimestamp> lastRecordedLwm = new 
AtomicReference<>();
@@ -73,7 +72,7 @@ public class StorageUpdateHandler {
     public StorageUpdateHandler(
             int partitionId,
             PartitionDataStorage storage,
-            Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes,
+            TableIndexStoragesSupplier indexes,
             DataStorageConfiguration dsCfg
     ) {
         this.partitionId = partitionId;
@@ -416,4 +415,42 @@ public class StorageUpdateHandler {
     public void waitIndexes() {
         indexes.get();
     }
+
+    /**
+     * Builds an index for all versions of a row.
+     *
+     * <p>Index is expected to exist, skips the tombstones.
+     *
+     * @param indexId Index ID.
+     * @param rowUuids Row uuids.
+     * @param finish Index build completion flag.
+     */
+    public void buildIndex(UUID indexId, List<UUID> rowUuids, boolean finish) {
+        // TODO: IGNITE-19082 Need another way to wait for index creation
+        indexes.addIndexToWaitIfAbsent(indexId);
+
+        TableSchemaAwareIndexStorage index = indexes.get().get(indexId);
+
+        assert index != null : "indexId=" + indexId + ", partitionId=" + 
partitionId;
+
+        RowId lastRowId = null;
+
+        for (UUID rowUuid : rowUuids) {
+            lastRowId = new RowId(partitionId, rowUuid);
+
+            try (Cursor<ReadResult> cursor = storage.scanVersions(lastRowId)) {
+                while (cursor.hasNext()) {
+                    ReadResult next = cursor.next();
+
+                    if (!next.isEmpty()) {
+                        index.put(next.binaryRow(), lastRowId);
+                    }
+                }
+            }
+        }
+
+        assert lastRowId != null || finish : "indexId=" + indexId + ", 
partitionId=" + partitionId;
+
+        index.storage().setNextRowIdToBuild(finish ? null : 
lastRowId.increment());
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableIndexStoragesSupplier.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableIndexStoragesSupplier.java
new file mode 100644
index 0000000000..bd0bb96b49
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableIndexStoragesSupplier.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Supplier table index storages.
+ */
+public interface TableIndexStoragesSupplier {
+    /**
+     * Returns indexes by their ID.
+     *
+     * <p>Waits for the primary key index and all other registered indexes to 
be created.
+     */
+    Map<UUID, TableSchemaAwareIndexStorage> get();
+
+    /**
+     * Adds index creation waits if it hasn't been created yet.
+     *
+     * @param indexId Index ID.
+     */
+    void addIndexToWaitIfAbsent(UUID indexId);
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
index 8a2b1990d1..88373fd5e5 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table.distributed;
 
 import static 
org.apache.ignite.internal.table.distributed.TableMessageGroup.GROUP_TYPE;
 
+import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
 import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
 import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
 import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
@@ -152,6 +153,9 @@ public interface TableMessageGroup {
         /** Message type for {@link UpdateCommand}. */
         short UPDATE = 43;
 
+        /** Message type for {@link BuildIndexCommand}. */
+        short BUILD_INDEX = 44;
+
         /** Message type for {@link TablePartitionIdMessage}. */
         short TABLE_PARTITION_ID = 61;
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/BuildIndexCommand.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/BuildIndexCommand.java
new file mode 100644
index 0000000000..dd0527fa4a
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/BuildIndexCommand.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * State machine command to build a table index.
+ */
+@Transferable(TableMessageGroup.Commands.BUILD_INDEX)
+public interface BuildIndexCommand extends WriteCommand {
+    /**
+     * Returns ID of table partition.
+     */
+    TablePartitionIdMessage tablePartitionId();
+
+    /**
+     * Returns index ID.
+     */
+    UUID indexId();
+
+    /**
+     * Returns row IDs for which to build indexes.
+     */
+    List<UUID> rowIds();
+
+    /**
+     * Returns {@code true} if this batch is the last one.
+     */
+    boolean finish();
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 04c6fb16e8..0954dea8ff 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -48,6 +48,7 @@ import 
org.apache.ignite.internal.storage.PartitionTimestampCursor;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
 import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
 import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
 import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
@@ -165,6 +166,8 @@ public class PartitionListener implements RaftGroupListener 
{
                     handleTxCleanupCommand((TxCleanupCommand) command, 
commandIndex, commandTerm);
                 } else if (command instanceof SafeTimeSyncCommand) {
                     handleSafeTimeSyncCommand((SafeTimeSyncCommand) command, 
commandIndex, commandTerm);
+                } else if (command instanceof BuildIndexCommand) {
+                    handleBuildIndexCommand((BuildIndexCommand) command, 
commandIndex, commandTerm);
                 } else {
                     assert false : "Command was not found [cmd=" + command + 
']';
                 }
@@ -422,4 +425,33 @@ public class PartitionListener implements 
RaftGroupListener {
     public MvPartitionStorage getMvStorage() {
         return storage.getStorage();
     }
+
+    /**
+     * Handler for the {@link BuildIndexCommand}.
+     *
+     * @param cmd Command.
+     * @param commandIndex RAFT index of the command.
+     * @param commandTerm RAFT term of the command.
+     */
+    void handleBuildIndexCommand(BuildIndexCommand cmd, long commandIndex, 
long commandTerm) {
+        // Skips the write command because the storage has already executed it.
+        if (commandIndex <= storage.lastAppliedIndex()) {
+            return;
+        }
+
+        storage.runConsistently(() -> {
+            storageUpdateHandler.buildIndex(cmd.indexId(), cmd.rowIds(), 
cmd.finish());
+
+            storage.lastApplied(commandIndex, commandTerm);
+
+            return null;
+        });
+
+        if (cmd.finish()) {
+            LOG.info(
+                    "Finish building the index: [tableId={}, partitionId={}, 
indexId={}]",
+                    cmd.tablePartitionId().tableId(), 
cmd.tablePartitionId().partitionId(), cmd.indexId()
+            );
+        }
+    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
index 30ffda06d3..fbb24b636c 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
@@ -44,6 +44,7 @@ import 
org.apache.ignite.internal.storage.index.SortedIndexDescriptor.SortedInde
 import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
 import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
 import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.BeforeEach;
@@ -118,12 +119,16 @@ public abstract class IndexBaseTest extends 
BaseMvStoragesTest {
 
         storage = new TestMvPartitionStorage(PARTITION_ID);
 
-        storageUpdateHandler = new StorageUpdateHandler(PARTITION_ID, new 
TestPartitionDataStorage(storage),
-                () -> Map.of(
-                        pkIndexId, pkStorage,
-                        sortedIndexId, sortedIndexStorage,
-                        hashIndexId, hashIndexStorage
-                ),
+        Map<UUID, TableSchemaAwareIndexStorage> indexes = Map.of(
+                pkIndexId, pkStorage,
+                sortedIndexId, sortedIndexStorage,
+                hashIndexId, hashIndexStorage
+        );
+
+        storageUpdateHandler = new StorageUpdateHandler(
+                PARTITION_ID,
+                new TestPartitionDataStorage(storage),
+                
DummyInternalTableImpl.createTableIndexStoragesSupplier(indexes),
                 dsCfg
         );
     }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteConcurrentTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteConcurrentTest.java
index 2c3997498a..c782bb327e 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteConcurrentTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteConcurrentTest.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
 
 import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.Map;
 import java.util.UUID;
 import java.util.function.Consumer;
 import org.apache.ignite.distributed.TestPartitionDataStorage;
@@ -41,6 +42,7 @@ import 
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfig
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
 import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.testframework.IgniteTestUtils.RunnableX;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -68,7 +70,12 @@ public class PartitionGcOnWriteConcurrentTest {
 
         when(storage.pollForVacuum(any())).thenReturn(null);
 
-        storageUpdateHandler = new StorageUpdateHandler(1, new 
TestPartitionDataStorage(storage), Collections::emptyMap, dsCfg);
+        storageUpdateHandler = new StorageUpdateHandler(
+                PARTITION_ID,
+                new TestPartitionDataStorage(storage),
+                
DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of()),
+                dsCfg
+        );
     }
 
     @ParameterizedTest
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteTest.java
index 62a3006d77..39721d97b1 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteTest.java
@@ -21,8 +21,8 @@ import static java.util.Collections.singletonMap;
 import static java.util.stream.Collectors.toList;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.distributed.TestPartitionDataStorage;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
 import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.BeforeEach;
@@ -58,7 +59,12 @@ public class PartitionGcOnWriteTest extends 
BaseMvStoragesTest {
     void setUp(@InjectConfiguration("mock.gcOnUpdateBatchSize=" + 
GC_BATCH_SIZE) DataStorageConfiguration dsCfg) {
         storage = new TestMvPartitionStorage(1);
 
-        storageUpdateHandler = new StorageUpdateHandler(1, new 
TestPartitionDataStorage(storage), Collections::emptyMap, dsCfg);
+        storageUpdateHandler = new StorageUpdateHandler(
+                1,
+                new TestPartitionDataStorage(storage),
+                
DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of()),
+                dsCfg
+        );
     }
 
     @ParameterizedTest
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
new file mode 100644
index 0000000000..69590f7603
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.toList;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.schema.BinaryRow;
+import 
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
+import org.apache.ignite.internal.util.Cursor;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * For {@link StorageUpdateHandler} testing.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class StorageUpdateHandlerTest {
+    private static final int PARTITION_ID = 0;
+
+    @InjectConfiguration
+    private DataStorageConfiguration dataStorageConfig;
+
+    private final HybridClock clock = new HybridClockImpl();
+
+    @Test
+    void testBuildIndex() {
+        PartitionDataStorage partitionStorage = 
mock(PartitionDataStorage.class);
+
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+        UUID indexId = UUID.randomUUID();
+
+        TableIndexStoragesSupplier indexes = 
mock(TableIndexStoragesSupplier.class);
+
+        when(indexes.get()).thenReturn(Map.of(indexId, indexStorage));
+
+        StorageUpdateHandler storageUpdateHandler = 
createStorageUpdateHandler(partitionStorage, indexes);
+
+        RowId rowId0 = new RowId(PARTITION_ID);
+        RowId rowId1 = new RowId(PARTITION_ID);
+
+        List<BinaryRow> rowVersions0 = asList(mock(BinaryRow.class), null);
+        List<BinaryRow> rowVersions1 = asList(mock(BinaryRow.class), null);
+
+        setRowVersions(partitionStorage, Map.of(rowId0.uuid(), rowVersions0, 
rowId1.uuid(), rowVersions1));
+
+        storageUpdateHandler.buildIndex(indexId, List.of(rowId0.uuid(), 
rowId1.uuid()), false);
+
+        verify(indexStorage).put(rowVersions0.get(0), rowId0);
+        verify(indexStorage, never()).put(rowVersions0.get(1), rowId0);
+
+        verify(indexStorage).put(rowVersions1.get(0), rowId1);
+        verify(indexStorage, never()).put(rowVersions1.get(1), rowId1);
+
+        verify(indexStorage.storage()).setNextRowIdToBuild(rowId1.increment());
+        verify(indexes).addIndexToWaitIfAbsent(indexId);
+
+        // Let's check one more batch - it will be the finishing one.
+        RowId rowId2 = new RowId(PARTITION_ID, UUID.randomUUID());
+
+        List<BinaryRow> rowVersions2 = singletonList(mock(BinaryRow.class));
+
+        setRowVersions(partitionStorage, Map.of(rowId2.uuid(), rowVersions2));
+
+        storageUpdateHandler.buildIndex(indexId, List.of(rowId2.uuid()), true);
+
+        verify(indexStorage).put(rowVersions2.get(0), rowId2);
+
+        verify(indexStorage.storage()).setNextRowIdToBuild(null);
+        verify(indexes, times(2)).addIndexToWaitIfAbsent(indexId);
+    }
+
+    private static TableSchemaAwareIndexStorage createIndexStorage() {
+        TableSchemaAwareIndexStorage indexStorage = 
mock(TableSchemaAwareIndexStorage.class);
+
+        IndexStorage storage = mock(IndexStorage.class);
+
+        when(indexStorage.storage()).thenReturn(storage);
+
+        return indexStorage;
+    }
+
+    private StorageUpdateHandler 
createStorageUpdateHandler(PartitionDataStorage partitionStorage, 
TableIndexStoragesSupplier indexes) {
+        return new StorageUpdateHandler(PARTITION_ID, partitionStorage, 
indexes, dataStorageConfig);
+    }
+
+    private void setRowVersions(PartitionDataStorage partitionStorage, 
Map<UUID, List<BinaryRow>> rowVersions) {
+        for (Entry<UUID, List<BinaryRow>> entry : rowVersions.entrySet()) {
+            RowId rowId = new RowId(PARTITION_ID, entry.getKey());
+
+            List<ReadResult> readResults = entry.getValue().stream()
+                    .map(binaryRow -> ReadResult.createFromCommitted(rowId, 
binaryRow, clock.now()))
+                    .collect(toList());
+
+            
when(partitionStorage.scanVersions(rowId)).thenReturn(Cursor.fromIterable(readResults));
+        }
+    }
+}
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index fb847edfbd..216dee1119 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -24,6 +24,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.any;
@@ -50,7 +51,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
-import java.util.function.Supplier;
 import org.apache.ignite.distributed.TestPartitionDataStorage;
 import org.apache.ignite.internal.TestHybridClock;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -62,7 +62,6 @@ import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.CommittedConfiguration;
-import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.command.HybridTimestampMessage;
 import 
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
 import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
@@ -85,10 +84,12 @@ import 
org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
 import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
 import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.tx.Timestamp;
@@ -128,9 +129,6 @@ public class PartitionCommandListenerTest {
             new Column[]{new Column("value", NativeTypes.INT32, false)}
     );
 
-    /** Hybrid clock. */
-    private static final HybridClock CLOCK = new HybridClockImpl();
-
     /** Table command listener. */
     private PartitionListener commandListener;
 
@@ -157,13 +155,13 @@ public class PartitionCommandListenerTest {
     private Path workDir;
 
     /** Factory for command messages. */
-    private TableMessagesFactory msgFactory = new TableMessagesFactory();
+    private final TableMessagesFactory msgFactory = new TableMessagesFactory();
 
     /** Factory for replica messages. */
-    private ReplicaMessagesFactory replicaMessagesFactory = new 
ReplicaMessagesFactory();
+    private final ReplicaMessagesFactory replicaMessagesFactory = new 
ReplicaMessagesFactory();
 
     /** Hybrid clock. */
-    private HybridClock hybridClock;
+    private final HybridClock hybridClock = new HybridClockImpl();
 
     /** Safe time tracker. */
     private PendingComparableValuesTracker<HybridTimestamp> safeTimeTracker;
@@ -173,6 +171,8 @@ public class PartitionCommandListenerTest {
 
     private final RaftGroupConfigurationConverter 
raftGroupConfigurationConverter = new RaftGroupConfigurationConverter();
 
+    private StorageUpdateHandler storageUpdateHandler;
+
     /**
      * Initializes a table listener before tests.
      */
@@ -184,15 +184,14 @@ public class PartitionCommandListenerTest {
 
         
when(clusterService.topologyService().localMember().address()).thenReturn(addr);
 
-        ReplicaService replicaService = mock(ReplicaService.class, 
RETURNS_DEEP_STUBS);
-
-        hybridClock = new HybridClockImpl();
-
         safeTimeTracker = new PendingComparableValuesTracker<>(new 
HybridTimestamp(1, 0));
 
-        Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () -> 
Map.of(pkStorage.id(), pkStorage);
-
-        StorageUpdateHandler storageUpdateHandler = new 
StorageUpdateHandler(0, partitionDataStorage, indexes, dsCfg);
+        storageUpdateHandler = spy(new StorageUpdateHandler(
+                PARTITION_ID,
+                partitionDataStorage,
+                
DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of(pkStorage.id(), 
pkStorage)),
+                dsCfg
+        ));
 
         commandListener = new PartitionListener(
                 partitionDataStorage,
@@ -281,9 +280,12 @@ public class PartitionCommandListenerTest {
     public void 
testOnSnapshotSavePropagateLastAppliedIndexAndTerm(@InjectConfiguration 
DataStorageConfiguration dsCfg) {
         TestPartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(mvPartitionStorage);
 
-        Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () -> 
Map.of(pkStorage.id(), pkStorage);
-
-        StorageUpdateHandler storageUpdateHandler = new 
StorageUpdateHandler(PARTITION_ID, partitionDataStorage, indexes, dsCfg);
+        StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(
+                PARTITION_ID,
+                partitionDataStorage,
+                
DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of(pkStorage.id(), 
pkStorage)),
+                dsCfg
+        );
 
         PartitionListener testCommandListener = new PartitionListener(
                 partitionDataStorage,
@@ -462,6 +464,49 @@ public class PartitionCommandListenerTest {
         applySafeTimeCommand(SafeTimeSyncCommand.class, testClock.now());
     }
 
+    @Test
+    void testBuildIndexCommand() {
+        UUID indexId = UUID.randomUUID();
+
+        doNothing().when(storageUpdateHandler).buildIndex(eq(indexId), 
any(List.class), anyBoolean());
+
+        List<UUID> rowUuids0 = List.of(UUID.randomUUID());
+        List<UUID> rowUuids1 = List.of(UUID.randomUUID());
+        List<UUID> rowUuids2 = List.of(UUID.randomUUID());
+
+        InOrder inOrder = inOrder(partitionDataStorage, storageUpdateHandler);
+
+        
commandListener.handleBuildIndexCommand(createBuildIndexCommand(indexId, 
rowUuids0, false), 10, 1);
+
+        inOrder.verify(storageUpdateHandler).buildIndex(indexId, rowUuids0, 
false);
+        inOrder.verify(partitionDataStorage).lastApplied(10, 1);
+
+        
commandListener.handleBuildIndexCommand(createBuildIndexCommand(indexId, 
rowUuids1, true), 20, 2);
+
+        inOrder.verify(storageUpdateHandler).buildIndex(indexId, rowUuids1, 
true);
+        inOrder.verify(partitionDataStorage).lastApplied(20, 2);
+
+        // Let's check that the command with a lower commandIndex than in the 
storage will not be executed.
+        
commandListener.handleBuildIndexCommand(createBuildIndexCommand(indexId, 
rowUuids2, false), 5, 1);
+
+        inOrder.verify(storageUpdateHandler, never()).buildIndex(indexId, 
rowUuids2, false);
+        inOrder.verify(partitionDataStorage, never()).lastApplied(5, 1);
+    }
+
+    private BuildIndexCommand createBuildIndexCommand(UUID indexId, List<UUID> 
rowUuids, boolean finish) {
+        return msgFactory.buildIndexCommand()
+                .tablePartitionId(
+                        msgFactory.tablePartitionIdMessage()
+                                .tableId(UUID.randomUUID())
+                                .partitionId(PARTITION_ID)
+                                .build()
+                )
+                .indexId(indexId)
+                .rowIds(rowUuids)
+                .finish(finish)
+                .build();
+    }
+
     private void applySafeTimeCommand(Class<? extends 
SafeTimePropagatingCommand> cls, HybridTimestamp timestamp) {
         HybridTimestampMessage safeTime = hybridTimestamp(timestamp);
 
@@ -572,7 +617,7 @@ public class PartitionCommandListenerTest {
             rows.put(Timestamp.nextVersion().toUuid(), row.byteBuffer());
         }
 
-        HybridTimestamp commitTimestamp = CLOCK.now();
+        HybridTimestamp commitTimestamp = hybridClock.now();
 
         invokeBatchedCommand(msgFactory.updateAllCommand()
                 .tablePartitionId(
@@ -608,7 +653,7 @@ public class PartitionCommandListenerTest {
             rows.put(readRow(row).uuid(), row.byteBuffer());
         }
 
-        HybridTimestamp commitTimestamp = CLOCK.now();
+        HybridTimestamp commitTimestamp = hybridClock.now();
 
         invokeBatchedCommand(msgFactory.updateAllCommand()
                 .tablePartitionId(
@@ -642,7 +687,7 @@ public class PartitionCommandListenerTest {
             keyRows.put(readRow(row).uuid(), null);
         }
 
-        HybridTimestamp commitTimestamp = CLOCK.now();
+        HybridTimestamp commitTimestamp = hybridClock.now();
 
         invokeBatchedCommand(msgFactory.updateAllCommand()
                 .tablePartitionId(
@@ -699,7 +744,7 @@ public class PartitionCommandListenerTest {
             }).when(clo).result(any());
         }));
 
-        HybridTimestamp commitTimestamp = CLOCK.now();
+        HybridTimestamp commitTimestamp = hybridClock.now();
 
         txIds.forEach(txId -> 
invokeBatchedCommand(msgFactory.txCleanupCommand()
                 .txId(txId)
@@ -743,7 +788,7 @@ public class PartitionCommandListenerTest {
             }).when(clo).result(any());
         }));
 
-        HybridTimestamp commitTimestamp = CLOCK.now();
+        HybridTimestamp commitTimestamp = hybridClock.now();
 
         txIds.forEach(txId -> 
invokeBatchedCommand(msgFactory.txCleanupCommand()
                 .txId(txId)
@@ -818,7 +863,7 @@ public class PartitionCommandListenerTest {
             }).when(clo).result(any());
         }));
 
-        HybridTimestamp now = CLOCK.now();
+        HybridTimestamp now = hybridClock.now();
 
         txIds.forEach(txId -> invokeBatchedCommand(
                 msgFactory.txCleanupCommand()
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index 8cde444fe6..99f7a34063 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -69,6 +69,7 @@ import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaL
 import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
 import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
 import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
+import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.tx.Lock;
@@ -189,7 +190,7 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
                 new StorageUpdateHandler(
                         PART_ID,
                         new 
TestPartitionDataStorage(TEST_MV_PARTITION_STORAGE),
-                        () -> Map.of(pkStorage.get().id(), pkStorage.get()),
+                        
DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of(pkStorage.get().id(),
 pkStorage.get())),
                         dsCfg
                 ),
                 peer -> true,
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 837307f0b9..ec6b5bc7f2 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -103,6 +103,7 @@ import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaL
 import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
 import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
 import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
+import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.tostring.IgniteToStringInclude;
@@ -336,7 +337,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 new StorageUpdateHandler(
                         partId,
                         partitionDataStorage,
-                        () -> Map.of(pkStorage.get().id(), pkStorage.get()),
+                        
DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of(pkStorage.get().id(),
 pkStorage.get())),
                         dsCfg
                 ),
                 peer -> localNode.name().equals(peer.consistentId()),
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index ac641836e7..0d607f4f65 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.impl;
 
+import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
@@ -29,7 +30,6 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
-import java.util.function.Supplier;
 import javax.naming.OperationNotSupportedException;
 import org.apache.ignite.configuration.ConfigurationValue;
 import org.apache.ignite.distributed.TestPartitionDataStorage;
@@ -62,6 +62,7 @@ import 
org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
 import org.apache.ignite.internal.table.distributed.HashIndexLocker;
 import org.apache.ignite.internal.table.distributed.IndexLocker;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.TableIndexStoragesSupplier;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
@@ -263,7 +264,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
 
         PendingComparableValuesTracker<HybridTimestamp> safeTime = new 
PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
         PartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(mvPartStorage);
-        Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () -> 
Map.of(pkStorage.get().id(), pkStorage.get());
+        TableIndexStoragesSupplier indexes = 
createTableIndexStoragesSupplier(Map.of(pkStorage.get().id(), pkStorage.get()));
 
         DataStorageConfiguration dsCfg = mock(DataStorageConfiguration.class);
         ConfigurationValue<Integer> gcBatchSizeValue = 
mock(ConfigurationValue.class);
@@ -398,4 +399,23 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
             safeTimeUpdaterThread = null;
         }
     }
+
+    /**
+     * Returns dummy table index storages supplier.
+     *
+     * @param indexes Index storage by ID.
+     */
+    public static TableIndexStoragesSupplier 
createTableIndexStoragesSupplier(Map<UUID, TableSchemaAwareIndexStorage> 
indexes) {
+        return new TableIndexStoragesSupplier() {
+            @Override
+            public Map<UUID, TableSchemaAwareIndexStorage> get() {
+                return indexes;
+            }
+
+            @Override
+            public void addIndexToWaitIfAbsent(UUID indexId) {
+                fail("not supported");
+            }
+        };
+    }
 }

Reply via email to