This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 31d3a200d4 IGNITE-22940 Expose partition size to public API (#4733)
31d3a200d4 is described below

commit 31d3a200d4df164f9a06b151051c8c5d4a016069
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Nov 19 07:38:40 2024 +0300

    IGNITE-22940 Expose partition size to public API (#4733)
---
 .../disaster/LocalPartitionStateMessage.java       |  3 +
 .../api/recovery/LocalPartitionStateResponse.java  | 10 +++-
 .../recovery/ItDisasterRecoveryControllerTest.java | 23 ++++++++
 .../rest/recovery/DisasterRecoveryController.java  |  3 +-
 .../disaster/ItDisasterRecoverySystemViewTest.java | 64 ++++++++++++++++++++--
 .../disaster/DisasterRecoveryManager.java          | 24 +++++++-
 .../disaster/DisasterRecoverySystemViews.java      |  2 +
 .../distributed/disaster/LocalPartitionState.java  |  7 ++-
 8 files changed, 126 insertions(+), 10 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStateMessage.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStateMessage.java
index df8ec000b6..7d0a97ead0 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStateMessage.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStateMessage.java
@@ -35,4 +35,7 @@ public interface LocalPartitionStateMessage extends 
NetworkMessage {
 
     /** Index of the last received log entry for this partition. */
     long logIndex();
+
+    /** Estimated number of rows for this partition. */
+    long estimatedRows();
 }
diff --git 
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStateResponse.java
 
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStateResponse.java
index 19aebff88f..48213c1733 100644
--- 
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStateResponse.java
+++ 
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStateResponse.java
@@ -34,6 +34,7 @@ public class LocalPartitionStateResponse {
     private final String tableName;
     private final String nodeName;
     private final String state;
+    private final long estimatedRows;
 
     /**
      * Constructor.
@@ -46,7 +47,8 @@ public class LocalPartitionStateResponse {
             @JsonProperty("tableId") int tableId,
             @JsonProperty("tableName") String tableName,
             @JsonProperty("partitionId") int partitionId,
-            @JsonProperty("state") String state
+            @JsonProperty("state") String state,
+            @JsonProperty("estimatedRows") long estimatedRows
     ) {
         this.partitionId = partitionId;
         this.tableId = tableId;
@@ -55,6 +57,7 @@ public class LocalPartitionStateResponse {
         this.zoneName = zoneName;
         this.nodeName = nodeName;
         this.state = state;
+        this.estimatedRows = estimatedRows;
     }
 
     @JsonGetter("partitionId")
@@ -91,4 +94,9 @@ public class LocalPartitionStateResponse {
     public String state() {
         return state;
     }
+
+    @JsonGetter("estimatedRows")
+    public long estimatedRows() {
+        return estimatedRows;
+    }
 }
diff --git 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
index 1498d0c12d..9501337eac 100644
--- 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
+++ 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
@@ -26,6 +26,7 @@ import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_P
 import static org.apache.ignite.internal.rest.constants.HttpCode.BAD_REQUEST;
 import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.is;
@@ -438,6 +439,28 @@ public class ItDisasterRecoveryControllerTest extends 
ClusterPerClassIntegration
         ));
     }
 
+    @Test
+    void testLocalPartitionStatesWithUpdatedEstimatedRows() {
+        insertRowToAllTables(1, 1);
+
+        HttpResponse<LocalPartitionStatesResponse> response = 
client.toBlocking().exchange(
+                "/state/local/",
+                LocalPartitionStatesResponse.class
+        );
+
+        assertEquals(HttpStatus.OK, response.status());
+
+        Set<Long> estimatedRows = 
response.body().states().stream().map(LocalPartitionStateResponse::estimatedRows).collect(toSet());
+
+        assertThat(estimatedRows, containsInAnyOrder(0L, 1L));
+    }
+
+    private static void insertRowToAllTables(int id, int val) {
+        ZONES_CONTAINING_TABLES.forEach(name -> {
+            sql(String.format("INSERT INTO PUBLIC.\"%s_table\" (id, val) 
values (%s, %s)", name, id, val));
+        });
+    }
+
     private static void checkLocalStates(List<LocalPartitionStateResponse> 
states, Set<String> zoneNames, Set<String> nodes) {
         assertFalse(states.isEmpty());
 
diff --git 
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
index fcec2e49a1..2d1b6c4441 100644
--- 
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
+++ 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
@@ -117,7 +117,8 @@ public class DisasterRecoveryController implements 
DisasterRecoveryApi, Resource
                         state.tableId,
                         state.tableName,
                         state.partitionId,
-                        state.state.name()
+                        state.state.name(),
+                        state.estimatedRows
                 ));
             }
         }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
index c2eb7bd58d..07f4fd2fd6 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
@@ -19,15 +19,19 @@ package org.apache.ignite.internal.disaster;
 
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
 import static 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateEnum.HEALTHY;
 import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
 import static org.apache.ignite.internal.table.TableTestUtils.TABLE_NAME;
 import static 
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.AVAILABLE;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.IntStream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.app.IgniteImpl;
@@ -36,6 +40,7 @@ import 
org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.restart.RestartProofIgnite;
 import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.distributed.PublicApiThreadingTable;
 import org.junit.jupiter.api.AfterEach;
@@ -102,10 +107,44 @@ public class ItDisasterRecoverySystemViewTest extends 
BaseSqlIntegrationTest {
         int tableId = getTableId(DEFAULT_SCHEMA_NAME, TABLE_NAME);
 
         assertQuery(localPartitionStatesSystemViewSql())
-                .returns(nodeName0, ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME, 
TABLE_NAME, 0, HEALTHY.name())
-                .returns(nodeName0, ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME, 
TABLE_NAME, 1, HEALTHY.name())
-                .returns(nodeName1, ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME, 
TABLE_NAME, 0, HEALTHY.name())
-                .returns(nodeName1, ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME, 
TABLE_NAME, 1, HEALTHY.name())
+                .returns(nodeName0, ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME, 
TABLE_NAME, 0, HEALTHY.name(), 0L)
+                .returns(nodeName0, ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME, 
TABLE_NAME, 1, HEALTHY.name(), 0L)
+                .returns(nodeName1, ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME, 
TABLE_NAME, 0, HEALTHY.name(), 0L)
+                .returns(nodeName1, ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME, 
TABLE_NAME, 1, HEALTHY.name(), 0L)
+                .check();
+    }
+
+    @Test
+    void testLocalPartitionStatesSystemViewWithUpdatedEstimatedRows() throws 
Exception {
+        assertEquals(2, initialNodes());
+
+        int partitionsCount = 1;
+
+        createZoneAndTable(ZONE_NAME, TABLE_NAME, initialNodes(), 
partitionsCount);
+
+        waitLeaderOnAllPartitions(TABLE_NAME, partitionsCount);
+
+        insertPeople(
+                TABLE_NAME,
+                new Person(1, "foo_name", 100.0),
+                new Person(2, "bar_name", 200.0)
+        );
+
+        List<String> nodeNames = 
CLUSTER.runningNodes().map(Ignite::name).sorted().collect(toList());
+
+        int tableId = getTableId(DEFAULT_SCHEMA_NAME, TABLE_NAME);
+
+        // Small wait is specially added so that the follower can execute the 
replicated "insert" command and the counter is honestly
+        // increased.
+        assertTrue(waitForCondition(
+                () -> nodeNames.stream().allMatch(nodeName -> 
estimatedSize(nodeName, TABLE_NAME, 0) >= 2L),
+                10,
+                1_000
+        ));
+
+        assertQuery(localPartitionStatesSystemViewSql())
+                .returns(nodeNames.get(0), ZONE_NAME, tableId, 
DEFAULT_SCHEMA_NAME, TABLE_NAME, 0, HEALTHY.name(), 2L)
+                .returns(nodeNames.get(1), ZONE_NAME, tableId, 
DEFAULT_SCHEMA_NAME, TABLE_NAME, 0, HEALTHY.name(), 2L)
                 .check();
     }
 
@@ -139,7 +178,8 @@ public class ItDisasterRecoverySystemViewTest extends 
BaseSqlIntegrationTest {
     }
 
     private static String localPartitionStatesSystemViewSql() {
-        return "SELECT NODE_NAME, ZONE_NAME, TABLE_ID, SCHEMA_NAME, 
TABLE_NAME, PARTITION_ID, STATE FROM SYSTEM.LOCAL_PARTITION_STATES";
+        return "SELECT NODE_NAME, ZONE_NAME, TABLE_ID, SCHEMA_NAME, 
TABLE_NAME, PARTITION_ID, STATE, ESTIMATED_ROWS"
+                + " FROM SYSTEM.LOCAL_PARTITION_STATES";
     }
 
     private static int getTableId(String schemaName, String tableName) {
@@ -147,4 +187,18 @@ public class ItDisasterRecoverySystemViewTest extends 
BaseSqlIntegrationTest {
 
         return 
catalogManager.catalog(catalogManager.latestCatalogVersion()).table(schemaName, 
tableName).id();
     }
+
+    private static long estimatedSize(String nodeName, String tableName, int 
partitionId) {
+        return CLUSTER.runningNodes()
+                .filter(ignite -> nodeName.equals(ignite.name()))
+                .map(ignite -> {
+                    TableImpl table = 
unwrapTableImpl(ignite.tables().table(tableName));
+
+                    return 
table.internalTable().storage().getMvPartition(partitionId);
+                })
+                .filter(Objects::nonNull)
+                .map(MvPartitionStorage::estimatedSize)
+                .findAny()
+                .orElse(-1L);
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index b411af60c0..74994de8c7 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -86,8 +86,10 @@ import 
org.apache.ignite.internal.partitiondistribution.Assignments;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.systemview.api.SystemView;
 import org.apache.ignite.internal.systemview.api.SystemViewProvider;
+import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import 
org.apache.ignite.internal.table.distributed.disaster.exceptions.DisasterRecoveryException;
 import 
org.apache.ignite.internal.table.distributed.disaster.exceptions.IllegalPartitionIdException;
@@ -558,9 +560,10 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
     }
 
     private void handleLocalPartitionStatesRequest(LocalPartitionStatesRequest 
request, ClusterNode sender, @Nullable Long correlationId) {
-        assert correlationId != null;
+        assert correlationId != null : "request=" + request + ", sender=" + 
sender;
 
         int catalogVersion = request.catalogVersion();
+
         catalogManager.catalogReadyFuture(catalogVersion).thenRunAsync(() -> {
             List<LocalPartitionStateMessage> statesList = new ArrayList<>();
 
@@ -578,6 +581,21 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
                         return;
                     }
 
+                    // Since the raft service starts after registering a new 
table, we don't need to wait or write additional asynchronous
+                    // code.
+                    TableViewInternal tableViewInternal = 
tableManager.cachedTable(tablePartitionId.tableId());
+                    // Perhaps the table began to be stopped or destroyed.
+                    if (tableViewInternal == null) {
+                        return;
+                    }
+
+                    MvPartitionStorage partitionStorage = 
tableViewInternal.internalTable().storage()
+                            .getMvPartition(tablePartitionId.partitionId());
+                    // Perhaps the partition began to be stopped or destroyed.
+                    if (partitionStorage == null) {
+                        return;
+                    }
+
                     LocalPartitionStateEnumWithLogIndex 
localPartitionStateWithLogIndex =
                             
LocalPartitionStateEnumWithLogIndex.of(raftGroupService.getRaftNode());
 
@@ -585,6 +603,7 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
                             
.partitionId(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, 
tablePartitionId))
                             .state(localPartitionStateWithLogIndex.state)
                             .logIndex(localPartitionStateWithLogIndex.logIndex)
+                            .estimatedRows(partitionStorage.estimatedSize())
                             .build()
                     );
                 }
@@ -659,7 +678,8 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
                 tableDescriptor.id(),
                 tableDescriptor.name(),
                 tablePartitionId.partitionId(),
-                stateEnum
+                stateEnum,
+                stateMsg.estimatedRows()
         );
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoverySystemViews.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoverySystemViews.java
index acc4c8ca11..8a9b897b02 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoverySystemViews.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoverySystemViews.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table.distributed.disaster;
 
 import static java.util.Comparator.comparing;
 import static org.apache.ignite.internal.type.NativeTypes.INT32;
+import static org.apache.ignite.internal.type.NativeTypes.INT64;
 import static org.apache.ignite.internal.type.NativeTypes.STRING;
 
 import java.util.Comparator;
@@ -65,6 +66,7 @@ class DisasterRecoverySystemViews {
                 .addColumn("TABLE_NAME", STRING, state -> 
state.state.tableName)
                 .addColumn("PARTITION_ID", INT32, state -> 
state.state.partitionId)
                 .addColumn("STATE", STRING, state -> state.state.state.name())
+                .addColumn("ESTIMATED_ROWS", INT64, state -> 
state.state.estimatedRows)
                 .dataProvider(systemViewPublisher(() -> 
localPartitionStatesAsync(manager)))
                 .build();
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
index be297dd4a7..f553801c8d 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
@@ -43,13 +43,17 @@ public class LocalPartitionState {
     @IgniteToStringInclude
     public final LocalPartitionStateEnum state;
 
+    @IgniteToStringInclude
+    public final long estimatedRows;
+
     LocalPartitionState(
             String zoneName,
             String schemaName,
             int tableId,
             String tableName,
             int partitionId,
-            LocalPartitionStateEnum state
+            LocalPartitionStateEnum state,
+            long estimatedRows
     ) {
         this.tableId = tableId;
         this.schemaName = schemaName;
@@ -57,6 +61,7 @@ public class LocalPartitionState {
         this.zoneName = zoneName;
         this.partitionId = partitionId;
         this.state = state;
+        this.estimatedRows = estimatedRows;
     }
 
     @Override

Reply via email to