denis-chudov commented on code in PR #1871:
URL: https://github.com/apache/ignite-3/pull/1871#discussion_r1156097355
##########
modules/placement-driver/build.gradle:
##########
@@ -22,8 +22,9 @@ apply from:
"$rootDir/buildscripts/java-integration-test.gradle"
apply from: "$rootDir/buildscripts/java-test-fixtures.gradle"
dependencies {
- implementation project(':ignite-metastorage-api')
+ api project(':ignite-placement-driver-api')
Review Comment:
why `api` instead of `implementation` and `integrationTestImplementation`?
seems `api` is not needed here
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java:
##########
@@ -97,6 +101,22 @@ public void startTrack() {
LOG.info("Leases cache recovered [leases={}]", leases);
}
+ /**
+ * Subscribes to notifications.
+ *
+ * @param closure Closure to notify about an adding / removing a lease.
+ */
+ public void subscribeLeaseAdded(LeaseUpdateClosure closure) {
+ this.updateClosure = closure;
+ }
+
+ /**
+ * Unsubscribes for notification.
+ */
+ public void unsubscribeLeaseAdded() {
+ this.updateClosure = null;
+ }
+
Review Comment:
Two verbs in method name are confusing, and it is called not only for lease
addition, let's rename it to `subscribeOnLeaseUpdate` and
`unsubscribeOnLeaseUpdate`?
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * A lease representation in memory.
+ * The real lease is stored in Meta storage.
+ */
+public class Lease implements Serializable {
+ /** The object is used when nothing holds the lease. */
+ public static Lease EMPTY_LEASE = new Lease(null, null, null, true);
+
+ /** A node that holds a lease until {@code stopLeas}. */
+ private final ClusterNode leaseholder;
+
+ /** The lease is accepted, when the holder knows about it and applies all
related obligations. */
+ private final boolean accepted;
+
+ /** Lease start timestamp. The timestamp is assigned when the lease
created and does not be changed when the lease is prolonged. */
+ private final HybridTimestamp startTime;
+
+ /** Timestamp to expiration the lease. */
+ private final HybridTimestamp expirationTime;
+
+ /**
+ * Creates a new lease.
+ *
+ * @param leaseholder Lease holder.
+ * @param startTime Start lease timestamp.
+ * @param leaseExpirationTime Lease expiration timestamp.
+ */
+ public Lease(ClusterNode leaseholder, HybridTimestamp startTime,
HybridTimestamp leaseExpirationTime) {
+ this(leaseholder, startTime, leaseExpirationTime, false);
+ }
+
+ /**
+ * The constructor.
+ *
+ * @param leaseholder Lease holder.
+ * @param startTime Start lease timestamp.
+ * @param leaseExpirationTime Lease expiration timestamp.
+ * @param accepted The flag is true when the holder accepted the lease,
the false otherwise.
+ */
+ private Lease(ClusterNode leaseholder, HybridTimestamp startTime,
HybridTimestamp leaseExpirationTime, boolean accepted) {
+ this.leaseholder = leaseholder;
+ this.expirationTime = leaseExpirationTime;
+ this.startTime = startTime;
+ this.accepted = accepted;
+ }
+
+ /**
+ * Prolongs a lease to until a new timestamp. Only an accepted lease
available to prolong.
Review Comment:
```suggestion
* Prolongs a lease until new timestamp. Only an accepted lease can be
prolonged.
```
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * A lease representation in memory.
+ * The real lease is stored in Meta storage.
+ */
+public class Lease implements Serializable {
+ /** The object is used when nothing holds the lease. */
+ public static Lease EMPTY_LEASE = new Lease(null, null, null, true);
Review Comment:
Why empty lease is accepted?
##########
modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java:
##########
@@ -321,6 +449,45 @@ private void checkLeaseCreated(TablePartitionId grpPartId)
throws InterruptedExc
}, 10_000));
}
+ /**
+ * Checks that the lease for the group is accepted.
+ *
+ * @param grpPartId Replication group id.
+ * @throws InterruptedException If the waiting is interrupted.
+ */
+ private void checkAccepted(TablePartitionId grpPartId) throws
InterruptedException {
+ assertTrue(waitForCondition(() -> {
+ var leaseFut =
metaStorageManager.get(fromString(PLACEMENTDRIVER_PREFIX + grpPartId));
+
+ var leaseEntry = leaseFut.join();
+
+ if (leaseEntry != null && !leaseEntry.empty()) {
+ Lease lease = ByteUtils.fromBytes(leaseEntry.value());
+
+ return lease.isAccepted();
+ }
+
+ return false;
+ }, 10_000));
Review Comment:
shouldn't `readLease` be used here? as well as in `checkLeaseCreated`
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * A lease representation in memory.
+ * The real lease is stored in Meta storage.
+ */
+public class Lease implements Serializable {
+ /** The object is used when nothing holds the lease. */
+ public static Lease EMPTY_LEASE = new Lease(null, null, null, true);
+
+ /** A node that holds a lease until {@code stopLeas}. */
+ private final ClusterNode leaseholder;
+
+ /** The lease is accepted, when the holder knows about it and applies all
related obligations. */
+ private final boolean accepted;
+
+ /** Lease start timestamp. The timestamp is assigned when the lease
created and does not be changed when the lease is prolonged. */
Review Comment:
```suggestion
/** Lease start timestamp. The timestamp is assigned when the lease
created and is not changed when the lease is prolonged. */
```
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * A lease representation in memory.
+ * The real lease is stored in Meta storage.
+ */
+public class Lease implements Serializable {
+ /** The object is used when nothing holds the lease. */
+ public static Lease EMPTY_LEASE = new Lease(null, null, null, true);
+
+ /** A node that holds a lease until {@code stopLeas}. */
+ private final ClusterNode leaseholder;
+
+ /** The lease is accepted, when the holder knows about it and applies all
related obligations. */
+ private final boolean accepted;
+
+ /** Lease start timestamp. The timestamp is assigned when the lease
created and does not be changed when the lease is prolonged. */
+ private final HybridTimestamp startTime;
+
+ /** Timestamp to expiration the lease. */
+ private final HybridTimestamp expirationTime;
+
+ /**
+ * Creates a new lease.
+ *
+ * @param leaseholder Lease holder.
+ * @param startTime Start lease timestamp.
+ * @param leaseExpirationTime Lease expiration timestamp.
+ */
+ public Lease(ClusterNode leaseholder, HybridTimestamp startTime,
HybridTimestamp leaseExpirationTime) {
+ this(leaseholder, startTime, leaseExpirationTime, false);
+ }
+
+ /**
+ * The constructor.
+ *
+ * @param leaseholder Lease holder.
+ * @param startTime Start lease timestamp.
+ * @param leaseExpirationTime Lease expiration timestamp.
+ * @param accepted The flag is true when the holder accepted the lease,
the false otherwise.
+ */
+ private Lease(ClusterNode leaseholder, HybridTimestamp startTime,
HybridTimestamp leaseExpirationTime, boolean accepted) {
+ this.leaseholder = leaseholder;
+ this.expirationTime = leaseExpirationTime;
+ this.startTime = startTime;
+ this.accepted = accepted;
+ }
+
+ /**
+ * Prolongs a lease to until a new timestamp. Only an accepted lease
available to prolong.
+ *
+ * @param to The new lease expiration timestamp.
+ * @return A new lease which will have the same properties except expire
timestamp.
+ */
+ public Lease prolongLease(HybridTimestamp to) {
+ assert accepted : "The lease should be accepted by leaseholder before
prolong ["
Review Comment:
```suggestion
assert accepted : "The lease should be accepted by leaseholder
before prolongation ["
```
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -231,6 +303,47 @@ private void updateLeaseInMetaStorage(ReplicationGroupId
grpId, Lease lease, Clu
);
}
+ /**
+ * Writes a prolong lease in Meta storage.
+ *
+ * @param grpId Replication group id.
+ * @param lease Lease to prolong.
+ */
+ private void prolongLeasInMetaStorage(ReplicationGroupId grpId, Lease
lease) {
+ var leaseKey = ByteArray.fromString(PLACEMENTDRIVER_PREFIX +
grpId);
+ var newTs = new HybridTimestamp(clock.now().getPhysical() +
LEASE_PERIOD, 0);
+
+ byte[] leaseRaw = ByteUtils.toBytes(lease);
+
+ Lease renewedLease = lease.prolongLease(newTs);
+
+ msManager.invoke(
+ value(leaseKey).eq(leaseRaw),
+ put(leaseKey, ByteUtils.toBytes(renewedLease)),
+ noop()
+ );
+ }
+
+ /**
+ * Writes an accepted lease in Meta storage.
+ *
+ * @param grpId Replication group id.
+ * @param lease Lease to accept.
+ */
+ private void acceptLeasInMetaStorage(ReplicationGroupId grpId, Lease
lease) {
Review Comment:
```suggestion
private void acceptLeaseInMetaStorage(ReplicationGroupId grpId,
Lease lease) {
```
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * A lease representation in memory.
+ * The real lease is stored in Meta storage.
+ */
+public class Lease implements Serializable {
+ /** The object is used when nothing holds the lease. */
+ public static Lease EMPTY_LEASE = new Lease(null, null, null, true);
+
+ /** A node that holds a lease until {@code stopLeas}. */
+ private final ClusterNode leaseholder;
+
+ /** The lease is accepted, when the holder knows about it and applies all
related obligations. */
+ private final boolean accepted;
+
+ /** Lease start timestamp. The timestamp is assigned when the lease
created and does not be changed when the lease is prolonged. */
+ private final HybridTimestamp startTime;
+
+ /** Timestamp to expiration the lease. */
+ private final HybridTimestamp expirationTime;
+
+ /**
+ * Creates a new lease.
+ *
+ * @param leaseholder Lease holder.
+ * @param startTime Start lease timestamp.
+ * @param leaseExpirationTime Lease expiration timestamp.
+ */
+ public Lease(ClusterNode leaseholder, HybridTimestamp startTime,
HybridTimestamp leaseExpirationTime) {
+ this(leaseholder, startTime, leaseExpirationTime, false);
+ }
+
+ /**
+ * The constructor.
+ *
+ * @param leaseholder Lease holder.
+ * @param startTime Start lease timestamp.
+ * @param leaseExpirationTime Lease expiration timestamp.
+ * @param accepted The flag is true when the holder accepted the lease,
the false otherwise.
+ */
+ private Lease(ClusterNode leaseholder, HybridTimestamp startTime,
HybridTimestamp leaseExpirationTime, boolean accepted) {
+ this.leaseholder = leaseholder;
+ this.expirationTime = leaseExpirationTime;
+ this.startTime = startTime;
+ this.accepted = accepted;
+ }
+
+ /**
+ * Prolongs a lease to until a new timestamp. Only an accepted lease
available to prolong.
+ *
+ * @param to The new lease expiration timestamp.
+ * @return A new lease which will have the same properties except expire
timestamp.
Review Comment:
```suggestion
* @return A new lease which will have the same properties except of
expiration timestamp.
```
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -179,24 +240,32 @@ public void run() {
Lease lease = leaseTracker.getLease(grpId);
+ if (!lease.isAccepted()) {
+ LeaseAgreement agreement =
leaseConciliator.conciliated(grpId);
+
+ if (agreement.isAccepted()) {
+ acceptLeasInMetaStorage(grpId, lease);
+ }
+ }
Review Comment:
There should be a check that the agreement contains the same lease.
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -146,18 +172,53 @@ public void deactivate() {
updaterTread = null;
}
+
+ leaseTracker.unsubscribeLeaseAdded();
+
+ leaseConciliator = null;
}
/**
* Finds a node that can be the leaseholder.
*
+ * @param groupId Replication group id.
* @param assignments Replication group assignment.
* @return Cluster node, or {@code null} if no node in assignments can be
the leaseholder.
*/
- private ClusterNode nextLeaseHolder(Set<Assignment> assignments) {
+ private ClusterNode nextLeaseHolder(ReplicationGroupId groupId,
Set<Assignment> assignments) {
//TODO: IGNITE-18879 Implement more intellectual algorithm to choose a
node.
+ LeaseAgreement agreement = leaseConciliator.conciliated(groupId);
+
+ if (agreement.getRedirectTo() != null) {
+ boolean hasInAssignments = false;
+
+ for (Assignment assignment : assignments) {
+ if
(agreement.getRedirectTo().equals(assignment.consistentId())) {
+ hasInAssignments = true;
+
+ break;
+ }
+ }
+
+ if (hasInAssignments) {
+ ClusterNode candidate =
topologyTracker.nodeByConsistentId(agreement.getRedirectTo());
+
+ if (candidate != null) {
+ return candidate;
+ }
+ }
+ }
Review Comment:
I strongly doubt that this should be handled here. I would mention the
interface that I have suggested in
https://issues.apache.org/jira/browse/IGNITE-18879 : on this stage we need only
candidate for a group, without consideration of redirect proposal. Moreover, I
see no reason to store intermediate state with redirect, we can only remember
that we have chosen the candidate (and that candidate was saved to meta
storage) and redirect proposal can be handled reactively.
Additionally (and it is not only about this part of code) the implementation
seems too complicated for me and definitely lacks reactiveness. We have updater
scheduler, map with leases, map with lease agreements, metastorage update
listeners that can change these maps, several intermediate states, etc. - we
will have pretty long explanation just to describe all of the logic that is
contained here. I suggest to think about how to simplify it.
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -231,6 +303,47 @@ private void updateLeaseInMetaStorage(ReplicationGroupId
grpId, Lease lease, Clu
);
}
+ /**
+ * Writes a prolong lease in Meta storage.
+ *
+ * @param grpId Replication group id.
+ * @param lease Lease to prolong.
+ */
+ private void prolongLeasInMetaStorage(ReplicationGroupId grpId, Lease
lease) {
Review Comment:
```suggestion
private void prolongLeaseInMetaStorage(ReplicationGroupId grpId,
Lease lease) {
```
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -242,7 +355,7 @@ private boolean isReplicationGroupUpdateLeaseholder(Lease
lease, ClusterNode can
HybridTimestamp now = clock.now();
return lease == EMPTY_LEASE
- || (!candidate.equals(lease.getLeaseholder()) &&
now.after(lease.getLeaseExpirationTime()));
+ || (!lease.isAccepted() &&
now.after(lease.getExpirationTime()));
Review Comment:
Why `candidate` parameter is still needed?
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/conciliation/LeaseConciliator.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.conciliation;
+
+import static
org.apache.ignite.internal.placementdriver.LeaseUpdater.LEASE_PERIOD;
+import static
org.apache.ignite.internal.placementdriver.conciliation.LeaseAgreement.UNDEFINED_AGREEMENT;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.placementdriver.LeaseUpdater;
+import org.apache.ignite.internal.placementdriver.leases.Lease;
+import
org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
+import
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * This class conciliates a lease with leaseholder. If the lease is
conciliated, it is ready available to accept.
+ */
+public class LeaseConciliator {
Review Comment:
The word "conciliator" confused me a lot :) conciliation implies that there
was some conflict between parties that needs to be resolved, I suggest to
remove this word from everywhere as it has another meaning. In fact, this class
just sends LeaseGrantMessage and holds a map, are you sure that it is actually
needed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]