This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 31d3a200d4 IGNITE-22940 Expose partition size to public API (#4733)
31d3a200d4 is described below
commit 31d3a200d4df164f9a06b151051c8c5d4a016069
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Nov 19 07:38:40 2024 +0300
IGNITE-22940 Expose partition size to public API (#4733)
---
.../disaster/LocalPartitionStateMessage.java | 3 +
.../api/recovery/LocalPartitionStateResponse.java | 10 +++-
.../recovery/ItDisasterRecoveryControllerTest.java | 23 ++++++++
.../rest/recovery/DisasterRecoveryController.java | 3 +-
.../disaster/ItDisasterRecoverySystemViewTest.java | 64 ++++++++++++++++++++--
.../disaster/DisasterRecoveryManager.java | 24 +++++++-
.../disaster/DisasterRecoverySystemViews.java | 2 +
.../distributed/disaster/LocalPartitionState.java | 7 ++-
8 files changed, 126 insertions(+), 10 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStateMessage.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStateMessage.java
index df8ec000b6..7d0a97ead0 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStateMessage.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStateMessage.java
@@ -35,4 +35,7 @@ public interface LocalPartitionStateMessage extends
NetworkMessage {
/** Index of the last received log entry for this partition. */
long logIndex();
+
+ /** Estimated number of rows for this partition. */
+ long estimatedRows();
}
diff --git
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStateResponse.java
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStateResponse.java
index 19aebff88f..48213c1733 100644
---
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStateResponse.java
+++
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStateResponse.java
@@ -34,6 +34,7 @@ public class LocalPartitionStateResponse {
private final String tableName;
private final String nodeName;
private final String state;
+ private final long estimatedRows;
/**
* Constructor.
@@ -46,7 +47,8 @@ public class LocalPartitionStateResponse {
@JsonProperty("tableId") int tableId,
@JsonProperty("tableName") String tableName,
@JsonProperty("partitionId") int partitionId,
- @JsonProperty("state") String state
+ @JsonProperty("state") String state,
+ @JsonProperty("estimatedRows") long estimatedRows
) {
this.partitionId = partitionId;
this.tableId = tableId;
@@ -55,6 +57,7 @@ public class LocalPartitionStateResponse {
this.zoneName = zoneName;
this.nodeName = nodeName;
this.state = state;
+ this.estimatedRows = estimatedRows;
}
@JsonGetter("partitionId")
@@ -91,4 +94,9 @@ public class LocalPartitionStateResponse {
public String state() {
return state;
}
+
+ @JsonGetter("estimatedRows")
+ public long estimatedRows() {
+ return estimatedRows;
+ }
}
diff --git
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
index 1498d0c12d..9501337eac 100644
---
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
+++
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
@@ -26,6 +26,7 @@ import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_P
import static org.apache.ignite.internal.rest.constants.HttpCode.BAD_REQUEST;
import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
@@ -438,6 +439,28 @@ public class ItDisasterRecoveryControllerTest extends
ClusterPerClassIntegration
));
}
+ @Test
+ void testLocalPartitionStatesWithUpdatedEstimatedRows() {
+ insertRowToAllTables(1, 1);
+
+ HttpResponse<LocalPartitionStatesResponse> response =
client.toBlocking().exchange(
+ "/state/local/",
+ LocalPartitionStatesResponse.class
+ );
+
+ assertEquals(HttpStatus.OK, response.status());
+
+ Set<Long> estimatedRows =
response.body().states().stream().map(LocalPartitionStateResponse::estimatedRows).collect(toSet());
+
+ assertThat(estimatedRows, containsInAnyOrder(0L, 1L));
+ }
+
+ private static void insertRowToAllTables(int id, int val) {
+ ZONES_CONTAINING_TABLES.forEach(name -> {
+ sql(String.format("INSERT INTO PUBLIC.\"%s_table\" (id, val)
values (%s, %s)", name, id, val));
+ });
+ }
+
private static void checkLocalStates(List<LocalPartitionStateResponse>
states, Set<String> zoneNames, Set<String> nodes) {
assertFalse(states.isEmpty());
diff --git
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
index fcec2e49a1..2d1b6c4441 100644
---
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
+++
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
@@ -117,7 +117,8 @@ public class DisasterRecoveryController implements
DisasterRecoveryApi, Resource
state.tableId,
state.tableName,
state.partitionId,
- state.state.name()
+ state.state.name(),
+ state.estimatedRows
));
}
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
index c2eb7bd58d..07f4fd2fd6 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
@@ -19,15 +19,19 @@ package org.apache.ignite.internal.disaster;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateEnum.HEALTHY;
import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
import static org.apache.ignite.internal.table.TableTestUtils.TABLE_NAME;
import static
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.AVAILABLE;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.List;
+import java.util.Objects;
import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.app.IgniteImpl;
@@ -36,6 +40,7 @@ import
org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.restart.RestartProofIgnite;
import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.PublicApiThreadingTable;
import org.junit.jupiter.api.AfterEach;
@@ -102,10 +107,44 @@ public class ItDisasterRecoverySystemViewTest extends
BaseSqlIntegrationTest {
int tableId = getTableId(DEFAULT_SCHEMA_NAME, TABLE_NAME);
assertQuery(localPartitionStatesSystemViewSql())
- .returns(nodeName0, ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME,
TABLE_NAME, 0, HEALTHY.name())
- .returns(nodeName0, ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME,
TABLE_NAME, 1, HEALTHY.name())
- .returns(nodeName1, ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME,
TABLE_NAME, 0, HEALTHY.name())
- .returns(nodeName1, ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME,
TABLE_NAME, 1, HEALTHY.name())
+ .returns(nodeName0, ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME,
TABLE_NAME, 0, HEALTHY.name(), 0L)
+ .returns(nodeName0, ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME,
TABLE_NAME, 1, HEALTHY.name(), 0L)
+ .returns(nodeName1, ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME,
TABLE_NAME, 0, HEALTHY.name(), 0L)
+ .returns(nodeName1, ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME,
TABLE_NAME, 1, HEALTHY.name(), 0L)
+ .check();
+ }
+
+ @Test
+ void testLocalPartitionStatesSystemViewWithUpdatedEstimatedRows() throws
Exception {
+ assertEquals(2, initialNodes());
+
+ int partitionsCount = 1;
+
+ createZoneAndTable(ZONE_NAME, TABLE_NAME, initialNodes(),
partitionsCount);
+
+ waitLeaderOnAllPartitions(TABLE_NAME, partitionsCount);
+
+ insertPeople(
+ TABLE_NAME,
+ new Person(1, "foo_name", 100.0),
+ new Person(2, "bar_name", 200.0)
+ );
+
+ List<String> nodeNames =
CLUSTER.runningNodes().map(Ignite::name).sorted().collect(toList());
+
+ int tableId = getTableId(DEFAULT_SCHEMA_NAME, TABLE_NAME);
+
+ // Small wait is specially added so that the follower can execute the
replicated "insert" command and the counter is honestly
+ // increased.
+ assertTrue(waitForCondition(
+ () -> nodeNames.stream().allMatch(nodeName ->
estimatedSize(nodeName, TABLE_NAME, 0) >= 2L),
+ 10,
+ 1_000
+ ));
+
+ assertQuery(localPartitionStatesSystemViewSql())
+ .returns(nodeNames.get(0), ZONE_NAME, tableId,
DEFAULT_SCHEMA_NAME, TABLE_NAME, 0, HEALTHY.name(), 2L)
+ .returns(nodeNames.get(1), ZONE_NAME, tableId,
DEFAULT_SCHEMA_NAME, TABLE_NAME, 0, HEALTHY.name(), 2L)
.check();
}
@@ -139,7 +178,8 @@ public class ItDisasterRecoverySystemViewTest extends
BaseSqlIntegrationTest {
}
private static String localPartitionStatesSystemViewSql() {
- return "SELECT NODE_NAME, ZONE_NAME, TABLE_ID, SCHEMA_NAME,
TABLE_NAME, PARTITION_ID, STATE FROM SYSTEM.LOCAL_PARTITION_STATES";
+ return "SELECT NODE_NAME, ZONE_NAME, TABLE_ID, SCHEMA_NAME,
TABLE_NAME, PARTITION_ID, STATE, ESTIMATED_ROWS"
+ + " FROM SYSTEM.LOCAL_PARTITION_STATES";
}
private static int getTableId(String schemaName, String tableName) {
@@ -147,4 +187,18 @@ public class ItDisasterRecoverySystemViewTest extends
BaseSqlIntegrationTest {
return
catalogManager.catalog(catalogManager.latestCatalogVersion()).table(schemaName,
tableName).id();
}
+
+ private static long estimatedSize(String nodeName, String tableName, int
partitionId) {
+ return CLUSTER.runningNodes()
+ .filter(ignite -> nodeName.equals(ignite.name()))
+ .map(ignite -> {
+ TableImpl table =
unwrapTableImpl(ignite.tables().table(tableName));
+
+ return
table.internalTable().storage().getMvPartition(partitionId);
+ })
+ .filter(Objects::nonNull)
+ .map(MvPartitionStorage::estimatedSize)
+ .findAny()
+ .orElse(-1L);
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index b411af60c0..74994de8c7 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -86,8 +86,10 @@ import
org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.systemview.api.SystemView;
import org.apache.ignite.internal.systemview.api.SystemViewProvider;
+import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.TableManager;
import
org.apache.ignite.internal.table.distributed.disaster.exceptions.DisasterRecoveryException;
import
org.apache.ignite.internal.table.distributed.disaster.exceptions.IllegalPartitionIdException;
@@ -558,9 +560,10 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
}
private void handleLocalPartitionStatesRequest(LocalPartitionStatesRequest
request, ClusterNode sender, @Nullable Long correlationId) {
- assert correlationId != null;
+ assert correlationId != null : "request=" + request + ", sender=" +
sender;
int catalogVersion = request.catalogVersion();
+
catalogManager.catalogReadyFuture(catalogVersion).thenRunAsync(() -> {
List<LocalPartitionStateMessage> statesList = new ArrayList<>();
@@ -578,6 +581,21 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
return;
}
+ // Since the raft service starts after registering a new
table, we don't need to wait or write additional asynchronous
+ // code.
+ TableViewInternal tableViewInternal =
tableManager.cachedTable(tablePartitionId.tableId());
+ // Perhaps the table began to be stopped or destroyed.
+ if (tableViewInternal == null) {
+ return;
+ }
+
+ MvPartitionStorage partitionStorage =
tableViewInternal.internalTable().storage()
+ .getMvPartition(tablePartitionId.partitionId());
+ // Perhaps the partition began to be stopped or destroyed.
+ if (partitionStorage == null) {
+ return;
+ }
+
LocalPartitionStateEnumWithLogIndex
localPartitionStateWithLogIndex =
LocalPartitionStateEnumWithLogIndex.of(raftGroupService.getRaftNode());
@@ -585,6 +603,7 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
.partitionId(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY,
tablePartitionId))
.state(localPartitionStateWithLogIndex.state)
.logIndex(localPartitionStateWithLogIndex.logIndex)
+ .estimatedRows(partitionStorage.estimatedSize())
.build()
);
}
@@ -659,7 +678,8 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
tableDescriptor.id(),
tableDescriptor.name(),
tablePartitionId.partitionId(),
- stateEnum
+ stateEnum,
+ stateMsg.estimatedRows()
);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoverySystemViews.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoverySystemViews.java
index acc4c8ca11..8a9b897b02 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoverySystemViews.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoverySystemViews.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table.distributed.disaster;
import static java.util.Comparator.comparing;
import static org.apache.ignite.internal.type.NativeTypes.INT32;
+import static org.apache.ignite.internal.type.NativeTypes.INT64;
import static org.apache.ignite.internal.type.NativeTypes.STRING;
import java.util.Comparator;
@@ -65,6 +66,7 @@ class DisasterRecoverySystemViews {
.addColumn("TABLE_NAME", STRING, state ->
state.state.tableName)
.addColumn("PARTITION_ID", INT32, state ->
state.state.partitionId)
.addColumn("STATE", STRING, state -> state.state.state.name())
+ .addColumn("ESTIMATED_ROWS", INT64, state ->
state.state.estimatedRows)
.dataProvider(systemViewPublisher(() ->
localPartitionStatesAsync(manager)))
.build();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
index be297dd4a7..f553801c8d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
@@ -43,13 +43,17 @@ public class LocalPartitionState {
@IgniteToStringInclude
public final LocalPartitionStateEnum state;
+ @IgniteToStringInclude
+ public final long estimatedRows;
+
LocalPartitionState(
String zoneName,
String schemaName,
int tableId,
String tableName,
int partitionId,
- LocalPartitionStateEnum state
+ LocalPartitionStateEnum state,
+ long estimatedRows
) {
this.tableId = tableId;
this.schemaName = schemaName;
@@ -57,6 +61,7 @@ public class LocalPartitionState {
this.zoneName = zoneName;
this.partitionId = partitionId;
this.state = state;
+ this.estimatedRows = estimatedRows;
}
@Override