tkalkirill commented on code in PR #5276:
URL: https://github.com/apache/ignite-3/pull/5276#discussion_r1971445766
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java:
##########
@@ -131,35 +142,73 @@ public void close() {
}
private void addListeners() {
- catalogService.listen(CatalogEvent.INDEX_BUILDING,
(StartBuildingIndexEventParameters parameters) -> {
- return onIndexBuilding(parameters).thenApply(unused -> false);
- });
+ catalogService.listen(INDEX_BUILDING,
+ (StartBuildingIndexEventParameters parameters) ->
onIndexBuilding(parameters).thenApply(unused -> false));
- catalogService.listen(CatalogEvent.INDEX_REMOVED,
(RemoveIndexEventParameters parameters) -> {
- return onIndexRemoved(parameters).thenApply(unused -> false);
- });
+ catalogService.listen(INDEX_REMOVED,
+ (RemoveIndexEventParameters parameters) ->
onIndexRemoved(parameters).thenApply(unused -> false));
- placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
parameters -> {
- return onPrimaryReplicaElected(parameters).thenApply(unused ->
false);
- });
+ placementDriver.listen(PRIMARY_REPLICA_ELECTED, parameters ->
onPrimaryReplicaElected(parameters).thenApply(unused -> false));
}
private CompletableFuture<?>
onIndexBuilding(StartBuildingIndexEventParameters parameters) {
return inBusyLockAsync(busyLock, () -> {
Catalog catalog =
catalogService.catalog(parameters.catalogVersion());
- assert catalog != null : "Not found catalog for version " +
parameters.catalogVersion();
+ assert catalog != null : "Failed to find a catalog for the
specified version [version=" + parameters.catalogVersion()
Review Comment:
It would be great to bring this into the method, but that's a pipe dream.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/package-info.java:
##########
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains replica request handlers that is used by
+ * {@link
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener}.
+ */
+package org.apache.ignite.internal.table.distributed.replicator.handlers;
Review Comment:
Wow, how unusual.
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTaskId.java:
##########
@@ -23,18 +23,33 @@
* {@link IndexBuildTask} ID.
*/
class IndexBuildTaskId {
+ private final int zoneId;
+
private final int tableId;
private final int partitionId;
private final int indexId;
- IndexBuildTaskId(int tableId, int partitionId, int indexId) {
+ /**
+ * Creates a new index building task.
+ *
+ * @param zoneId Distribution zone identifier.
+ * @param tableId Table identifier.
Review Comment:
```suggestion
* @param tableId Table ID.
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator.handlers;
+
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.BUILDING;
+import static
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.REGISTERED;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
+import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand;
+import
org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest;
+import org.apache.ignite.internal.table.distributed.index.IndexMeta;
+import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
+import
org.apache.ignite.internal.table.distributed.index.MetaIndexStatusChange;
+import
org.apache.ignite.internal.table.distributed.replicator.IndexBuilderTxRwOperationTracker;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+
+/**
+ * Handler for {@link BuildIndexReplicaRequest}.
+ */
+public class BuildIndexReplicaRequestHandler {
+ /** Factory to create RAFT command messages. */
+ private static final PartitionReplicationMessagesFactory
PARTITION_REPLICATION_MESSAGES_FACTORY =
+ new PartitionReplicationMessagesFactory();
+
+ private final IndexMetaStorage indexMetaStorage;
+
+ /** Read-write transaction operation tracker for building indexes. */
+ private final IndexBuilderTxRwOperationTracker txRwOperationTracker;
+
+ /** Safe time. */
Review Comment:
Safe time of/for what?
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTaskId.java:
##########
@@ -23,18 +23,33 @@
* {@link IndexBuildTask} ID.
*/
class IndexBuildTaskId {
+ private final int zoneId;
+
private final int tableId;
private final int partitionId;
private final int indexId;
- IndexBuildTaskId(int tableId, int partitionId, int indexId) {
+ /**
+ * Creates a new index building task.
+ *
+ * @param zoneId Distribution zone identifier.
+ * @param tableId Table identifier.
+ * @param partitionId Partition identifier.
Review Comment:
```suggestion
* @param partitionId Partition ID.
```
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java:
##########
@@ -184,60 +233,113 @@ private CompletableFuture<?>
onIndexRemoved(RemoveIndexEventParameters parameter
private CompletableFuture<?>
onPrimaryReplicaElected(PrimaryReplicaEventParameters parameters) {
return inBusyLockAsync(busyLock, () -> {
- TablePartitionId primaryReplicaId = (TablePartitionId)
parameters.groupId();
-
if (isLocalNode(clusterService, parameters.leaseholderId())) {
- primaryReplicaIds.add(primaryReplicaId);
+ // TODO https://issues.apache.org/jira/browse/IGNITE-24375
+ // Need to remove TablePartitionId check here and below.
+ assert parameters.groupId() instanceof ZonePartitionId ||
parameters.groupId() instanceof TablePartitionId :
+ "Primary replica ID must be of type ZonePartitionId or
TablePartitionId";
+
+ primaryReplicaIds.add(parameters.groupId());
// It is safe to get the latest version of the catalog because
the PRIMARY_REPLICA_ELECTED event is handled on the
// metastore thread.
Catalog catalog =
catalogService.catalog(catalogService.latestCatalogVersion());
- // TODO: IGNITE-22656 It is necessary not to generate an event
for a destroyed table by LWM
- if (catalog == null ||
catalog.table(primaryReplicaId.tableId()) == null) {
- return nullCompletedFuture();
- }
+ if (parameters.groupId() instanceof ZonePartitionId) {
+ assert enabledColocation() : "Primary replica ID must be
of type ZonePartitionId";
- return getMvTableStorageFuture(parameters.causalityToken(),
primaryReplicaId)
- .thenCompose(mvTableStorage ->
awaitPrimaryReplica(primaryReplicaId, parameters.startTime())
- .thenAccept(replicaMeta ->
tryScheduleBuildIndexesForNewPrimaryReplica(
- catalog.version(),
- primaryReplicaId,
- mvTableStorage,
- replicaMeta
- ))
- );
+ ZonePartitionId primaryReplicaId = (ZonePartitionId)
parameters.groupId();
+
+ CatalogZoneDescriptor zoneDescriptor =
catalog.zone(primaryReplicaId.zoneId());
+ // TODO: IGNITE-22656 It is necessary not to generate an
event for a destroyed zone by LWM
+ if (zoneDescriptor == null) {
+ return nullCompletedFuture();
+ }
+
+ var indexFutures = new ArrayList<CompletableFuture<?>>();
+ for (CatalogTableDescriptor tableDescriptor :
catalog.tables()) {
+ // 1. Perhaps, it makes sense to get primary replica
future first and then get table storage future,
+ // because, it will be the same for all tables in the
zone for the given partition.
+ // 2. Is it possible to filter out tables in efficient
way
+ // that do not have indices to avoid creating
unnecessary futures?
+ // It looks like
catalog.indexes(tableDescriptor.id()).isEmpty() is good enough.
+ // However, what about PK index?
+ CompletableFuture<?> future =
+
getMvTableStorageFuture(parameters.causalityToken(), tableDescriptor.id())
+ .thenCompose(mvTableStorage ->
awaitPrimaryReplica(primaryReplicaId, parameters.startTime())
+ .thenAccept(replicaMeta ->
tryScheduleBuildIndexesForNewPrimaryReplica(
+ catalog.version(),
+ tableDescriptor,
+ primaryReplicaId,
+ mvTableStorage,
+ replicaMeta)));
+
+ indexFutures.add(future);
+ }
+
+ return
allOf(indexFutures.toArray(CompletableFuture[]::new));
Review Comment:
U can use `CompletableFutures#allOf`
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java:
##########
@@ -219,10 +223,13 @@ private List<RowId> createBatchRowIds() {
private BuildIndexReplicaRequest
createBuildIndexReplicaRequest(List<RowId> rowIds) {
boolean finish = rowIds.size() < batchSize;
- TablePartitionId tablePartitionId = new
TablePartitionId(taskId.getTableId(), taskId.getPartitionId());
+ ReplicationGroupIdMessage groupIdMessage = enabledColocation()
Review Comment:
Maybe a separate method that create `ReplicationGroupIdMessage` ?
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java:
##########
@@ -131,35 +142,73 @@ public void close() {
}
private void addListeners() {
- catalogService.listen(CatalogEvent.INDEX_BUILDING,
(StartBuildingIndexEventParameters parameters) -> {
- return onIndexBuilding(parameters).thenApply(unused -> false);
- });
+ catalogService.listen(INDEX_BUILDING,
+ (StartBuildingIndexEventParameters parameters) ->
onIndexBuilding(parameters).thenApply(unused -> false));
- catalogService.listen(CatalogEvent.INDEX_REMOVED,
(RemoveIndexEventParameters parameters) -> {
- return onIndexRemoved(parameters).thenApply(unused -> false);
- });
+ catalogService.listen(INDEX_REMOVED,
+ (RemoveIndexEventParameters parameters) ->
onIndexRemoved(parameters).thenApply(unused -> false));
- placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
parameters -> {
- return onPrimaryReplicaElected(parameters).thenApply(unused ->
false);
- });
+ placementDriver.listen(PRIMARY_REPLICA_ELECTED, parameters ->
onPrimaryReplicaElected(parameters).thenApply(unused -> false));
}
private CompletableFuture<?>
onIndexBuilding(StartBuildingIndexEventParameters parameters) {
return inBusyLockAsync(busyLock, () -> {
Catalog catalog =
catalogService.catalog(parameters.catalogVersion());
- assert catalog != null : "Not found catalog for version " +
parameters.catalogVersion();
+ assert catalog != null : "Failed to find a catalog for the
specified version [version=" + parameters.catalogVersion()
+ + ", earliestVersion=" +
catalogService.earliestCatalogVersion()
+ + ", latestVersion=" +
catalogService.latestCatalogVersion()
+ + "].";
CatalogIndexDescriptor indexDescriptor =
catalog.index(parameters.indexId());
+ assert indexDescriptor != null : "Failed to find an index
descriptor for the specified index [indexId="
Review Comment:
Same about method.
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java:
##########
@@ -100,6 +101,7 @@ class IndexBuilder implements ManuallyCloseable {
* to the replica.
*/
public void scheduleBuildIndex(
+ int zoneId,
Review Comment:
Probably we should introduce a new class for these parameters, but it is not
necessary to do this now.
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTaskId.java:
##########
@@ -23,18 +23,33 @@
* {@link IndexBuildTask} ID.
*/
class IndexBuildTaskId {
+ private final int zoneId;
+
private final int tableId;
private final int partitionId;
private final int indexId;
- IndexBuildTaskId(int tableId, int partitionId, int indexId) {
+ /**
+ * Creates a new index building task.
+ *
+ * @param zoneId Distribution zone identifier.
+ * @param tableId Table identifier.
+ * @param partitionId Partition identifier.
+ * @param indexId Index identifier.
Review Comment:
```suggestion
* @param indexId Index ID.
```
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java:
##########
@@ -131,35 +142,73 @@ public void close() {
}
private void addListeners() {
- catalogService.listen(CatalogEvent.INDEX_BUILDING,
(StartBuildingIndexEventParameters parameters) -> {
- return onIndexBuilding(parameters).thenApply(unused -> false);
- });
+ catalogService.listen(INDEX_BUILDING,
+ (StartBuildingIndexEventParameters parameters) ->
onIndexBuilding(parameters).thenApply(unused -> false));
- catalogService.listen(CatalogEvent.INDEX_REMOVED,
(RemoveIndexEventParameters parameters) -> {
- return onIndexRemoved(parameters).thenApply(unused -> false);
- });
+ catalogService.listen(INDEX_REMOVED,
+ (RemoveIndexEventParameters parameters) ->
onIndexRemoved(parameters).thenApply(unused -> false));
- placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
parameters -> {
- return onPrimaryReplicaElected(parameters).thenApply(unused ->
false);
- });
+ placementDriver.listen(PRIMARY_REPLICA_ELECTED, parameters ->
onPrimaryReplicaElected(parameters).thenApply(unused -> false));
}
private CompletableFuture<?>
onIndexBuilding(StartBuildingIndexEventParameters parameters) {
return inBusyLockAsync(busyLock, () -> {
Catalog catalog =
catalogService.catalog(parameters.catalogVersion());
- assert catalog != null : "Not found catalog for version " +
parameters.catalogVersion();
+ assert catalog != null : "Failed to find a catalog for the
specified version [version=" + parameters.catalogVersion()
+ + ", earliestVersion=" +
catalogService.earliestCatalogVersion()
+ + ", latestVersion=" +
catalogService.latestCatalogVersion()
+ + "].";
CatalogIndexDescriptor indexDescriptor =
catalog.index(parameters.indexId());
+ assert indexDescriptor != null : "Failed to find an index
descriptor for the specified index [indexId="
+ + parameters.indexId() + "].";
+
+ CatalogZoneDescriptor zoneDescriptor =
catalog.zone(catalog.table(indexDescriptor.tableId()).zoneId());
+
+ assert zoneDescriptor != null : "Failed to find a zone descriptor
for the specified table [indexId="
Review Comment:
Same about method.
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java:
##########
@@ -184,60 +233,113 @@ private CompletableFuture<?>
onIndexRemoved(RemoveIndexEventParameters parameter
private CompletableFuture<?>
onPrimaryReplicaElected(PrimaryReplicaEventParameters parameters) {
return inBusyLockAsync(busyLock, () -> {
- TablePartitionId primaryReplicaId = (TablePartitionId)
parameters.groupId();
-
if (isLocalNode(clusterService, parameters.leaseholderId())) {
- primaryReplicaIds.add(primaryReplicaId);
+ // TODO https://issues.apache.org/jira/browse/IGNITE-24375
+ // Need to remove TablePartitionId check here and below.
+ assert parameters.groupId() instanceof ZonePartitionId ||
parameters.groupId() instanceof TablePartitionId :
+ "Primary replica ID must be of type ZonePartitionId or
TablePartitionId";
Review Comment:
please add parameters.grouprId() to error message
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTaskId.java:
##########
@@ -23,18 +23,33 @@
* {@link IndexBuildTask} ID.
*/
class IndexBuildTaskId {
+ private final int zoneId;
+
private final int tableId;
private final int partitionId;
private final int indexId;
- IndexBuildTaskId(int tableId, int partitionId, int indexId) {
+ /**
+ * Creates a new index building task.
+ *
+ * @param zoneId Distribution zone identifier.
Review Comment:
```suggestion
* @param zoneId Distribution zone ID.
```
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java:
##########
@@ -197,13 +201,15 @@ public void scheduleBuildIndexAfterDisasterRecovery(
/**
* Stops index building if it is in progress.
*
+ * @param zoneId Distribution zone ID.
* @param tableId Table ID.
* @param partitionId Partition ID.
* @param indexId Index ID.
*/
- public void stopBuildIndex(int tableId, int partitionId, int indexId) {
+ // TODO remove unused method
Review Comment:
why? If it is in this ticket, then it should be indicated.
##########
modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java:
##########
@@ -146,6 +156,7 @@ void testStartBuildIndexesOnIndexCreate() {
createIndex(INDEX_NAME);
verify(indexBuilder, never()).scheduleBuildIndex(
+ eq(zoneId(TABLE_NAME)),
Review Comment:
Will several zones be checked here? If not, then I suggest making the
`zoneId()` method similar to `tableId()`.
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java:
##########
@@ -117,7 +119,7 @@ public void scheduleBuildIndex(
return;
}
- IndexBuildTaskId taskId = new IndexBuildTaskId(tableId,
partitionId, indexId);
+ IndexBuildTaskId taskId = new IndexBuildTaskId(zoneId, tableId,
partitionId, indexId);
Review Comment:
```suggestion
var taskId = new IndexBuildTaskId(zoneId, tableId, partitionId,
indexId);
```
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java:
##########
@@ -174,7 +178,7 @@ public void scheduleBuildIndexAfterDisasterRecovery(
return;
}
- IndexBuildTaskId taskId = new IndexBuildTaskId(tableId,
partitionId, indexId);
+ IndexBuildTaskId taskId = new IndexBuildTaskId(zoneId, tableId,
partitionId, indexId);
Review Comment:
```suggestion
var taskId = new IndexBuildTaskId(zoneId, tableId, partitionId,
indexId);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]