denis-chudov commented on code in PR #1871:
URL: https://github.com/apache/ignite-3/pull/1871#discussion_r1156097355


##########
modules/placement-driver/build.gradle:
##########
@@ -22,8 +22,9 @@ apply from: 
"$rootDir/buildscripts/java-integration-test.gradle"
 apply from: "$rootDir/buildscripts/java-test-fixtures.gradle"
 
 dependencies {
-    implementation project(':ignite-metastorage-api')
+    api project(':ignite-placement-driver-api')

Review Comment:
   why `api` instead of `implementation` and `integrationTestImplementation`? 
seems `api` is not needed here



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java:
##########
@@ -97,6 +101,22 @@ public void startTrack() {
         LOG.info("Leases cache recovered [leases={}]", leases);
     }
 
+    /**
+     * Subscribes to notifications.
+     *
+     * @param closure Closure to notify about an adding / removing a lease.
+     */
+    public void subscribeLeaseAdded(LeaseUpdateClosure closure) {
+        this.updateClosure = closure;
+    }
+
+    /**
+     * Unsubscribes for notification.
+     */
+    public void unsubscribeLeaseAdded() {
+        this.updateClosure = null;
+    }
+

Review Comment:
   Two verbs in method name are confusing, and it is called not only for lease 
addition, let's rename it to `subscribeOnLeaseUpdate` and 
`unsubscribeOnLeaseUpdate`?



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * A lease representation in memory.
+ * The real lease is stored in Meta storage.
+ */
+public class Lease implements Serializable {
+    /** The object is used when nothing holds the lease. */
+    public static Lease EMPTY_LEASE = new Lease(null, null, null, true);
+
+    /** A node that holds a lease until {@code stopLeas}. */
+    private final ClusterNode leaseholder;
+
+    /** The lease is accepted, when the holder knows about it and applies all 
related obligations. */
+    private final boolean accepted;
+
+    /** Lease start timestamp. The timestamp is assigned when the lease 
created and does not be changed when the lease is prolonged. */
+    private final HybridTimestamp startTime;
+
+    /** Timestamp to expiration the lease. */
+    private final HybridTimestamp expirationTime;
+
+    /**
+     * Creates a new lease.
+     *
+     * @param leaseholder Lease holder.
+     * @param startTime Start lease timestamp.
+     * @param leaseExpirationTime Lease expiration timestamp.
+     */
+    public Lease(ClusterNode leaseholder, HybridTimestamp startTime, 
HybridTimestamp leaseExpirationTime) {
+        this(leaseholder, startTime, leaseExpirationTime, false);
+    }
+
+    /**
+     * The constructor.
+     *
+     * @param leaseholder Lease holder.
+     * @param startTime Start lease timestamp.
+     * @param leaseExpirationTime Lease expiration timestamp.
+     * @param accepted The flag is true when the holder accepted the lease, 
the false otherwise.
+     */
+    private Lease(ClusterNode leaseholder, HybridTimestamp startTime, 
HybridTimestamp leaseExpirationTime, boolean accepted) {
+        this.leaseholder = leaseholder;
+        this.expirationTime = leaseExpirationTime;
+        this.startTime = startTime;
+        this.accepted = accepted;
+    }
+
+    /**
+     * Prolongs a lease to until a new timestamp. Only an accepted lease 
available to prolong.

Review Comment:
   ```suggestion
        * Prolongs a lease until new timestamp. Only an accepted lease can be 
prolonged.
   ```



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * A lease representation in memory.
+ * The real lease is stored in Meta storage.
+ */
+public class Lease implements Serializable {
+    /** The object is used when nothing holds the lease. */
+    public static Lease EMPTY_LEASE = new Lease(null, null, null, true);

Review Comment:
   Why empty lease is accepted?



##########
modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java:
##########
@@ -321,6 +449,45 @@ private void checkLeaseCreated(TablePartitionId grpPartId) 
throws InterruptedExc
         }, 10_000));
     }
 
+    /**
+     * Checks that the lease for the group is accepted.
+     *
+     * @param grpPartId Replication group id.
+     * @throws InterruptedException If the waiting is interrupted.
+     */
+    private void checkAccepted(TablePartitionId grpPartId) throws 
InterruptedException {
+        assertTrue(waitForCondition(() -> {
+            var leaseFut = 
metaStorageManager.get(fromString(PLACEMENTDRIVER_PREFIX + grpPartId));
+
+            var leaseEntry = leaseFut.join();
+
+            if (leaseEntry != null && !leaseEntry.empty()) {
+                Lease lease = ByteUtils.fromBytes(leaseEntry.value());
+
+                return lease.isAccepted();
+            }
+
+            return false;
+        }, 10_000));

Review Comment:
   shouldn't `readLease` be used here? as well as in `checkLeaseCreated`



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * A lease representation in memory.
+ * The real lease is stored in Meta storage.
+ */
+public class Lease implements Serializable {
+    /** The object is used when nothing holds the lease. */
+    public static Lease EMPTY_LEASE = new Lease(null, null, null, true);
+
+    /** A node that holds a lease until {@code stopLeas}. */
+    private final ClusterNode leaseholder;
+
+    /** The lease is accepted, when the holder knows about it and applies all 
related obligations. */
+    private final boolean accepted;
+
+    /** Lease start timestamp. The timestamp is assigned when the lease 
created and does not be changed when the lease is prolonged. */

Review Comment:
   ```suggestion
       /** Lease start timestamp. The timestamp is assigned when the lease 
created and is not changed when the lease is prolonged. */
   ```



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * A lease representation in memory.
+ * The real lease is stored in Meta storage.
+ */
+public class Lease implements Serializable {
+    /** The object is used when nothing holds the lease. */
+    public static Lease EMPTY_LEASE = new Lease(null, null, null, true);
+
+    /** A node that holds a lease until {@code stopLeas}. */
+    private final ClusterNode leaseholder;
+
+    /** The lease is accepted, when the holder knows about it and applies all 
related obligations. */
+    private final boolean accepted;
+
+    /** Lease start timestamp. The timestamp is assigned when the lease 
created and does not be changed when the lease is prolonged. */
+    private final HybridTimestamp startTime;
+
+    /** Timestamp to expiration the lease. */
+    private final HybridTimestamp expirationTime;
+
+    /**
+     * Creates a new lease.
+     *
+     * @param leaseholder Lease holder.
+     * @param startTime Start lease timestamp.
+     * @param leaseExpirationTime Lease expiration timestamp.
+     */
+    public Lease(ClusterNode leaseholder, HybridTimestamp startTime, 
HybridTimestamp leaseExpirationTime) {
+        this(leaseholder, startTime, leaseExpirationTime, false);
+    }
+
+    /**
+     * The constructor.
+     *
+     * @param leaseholder Lease holder.
+     * @param startTime Start lease timestamp.
+     * @param leaseExpirationTime Lease expiration timestamp.
+     * @param accepted The flag is true when the holder accepted the lease, 
the false otherwise.
+     */
+    private Lease(ClusterNode leaseholder, HybridTimestamp startTime, 
HybridTimestamp leaseExpirationTime, boolean accepted) {
+        this.leaseholder = leaseholder;
+        this.expirationTime = leaseExpirationTime;
+        this.startTime = startTime;
+        this.accepted = accepted;
+    }
+
+    /**
+     * Prolongs a lease to until a new timestamp. Only an accepted lease 
available to prolong.
+     *
+     * @param to The new lease expiration timestamp.
+     * @return A new lease which will have the same properties except expire 
timestamp.
+     */
+    public Lease prolongLease(HybridTimestamp to) {
+        assert accepted : "The lease should be accepted by leaseholder before 
prolong ["

Review Comment:
   ```suggestion
           assert accepted : "The lease should be accepted by leaseholder 
before prolongation ["
   ```



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -231,6 +303,47 @@ private void updateLeaseInMetaStorage(ReplicationGroupId 
grpId, Lease lease, Clu
             );
         }
 
+        /**
+         * Writes a prolong lease in Meta storage.
+         *
+         * @param grpId Replication group id.
+         * @param lease Lease to prolong.
+         */
+        private void prolongLeasInMetaStorage(ReplicationGroupId grpId, Lease 
lease) {
+            var leaseKey = ByteArray.fromString(PLACEMENTDRIVER_PREFIX + 
grpId);
+            var newTs = new HybridTimestamp(clock.now().getPhysical() + 
LEASE_PERIOD, 0);
+
+            byte[] leaseRaw = ByteUtils.toBytes(lease);
+
+            Lease renewedLease = lease.prolongLease(newTs);
+
+            msManager.invoke(
+                    value(leaseKey).eq(leaseRaw),
+                    put(leaseKey, ByteUtils.toBytes(renewedLease)),
+                    noop()
+            );
+        }
+
+        /**
+         * Writes an accepted lease in Meta storage.
+         *
+         * @param grpId Replication group id.
+         * @param lease Lease to accept.
+         */
+        private void acceptLeasInMetaStorage(ReplicationGroupId grpId, Lease 
lease) {

Review Comment:
   ```suggestion
           private void acceptLeaseInMetaStorage(ReplicationGroupId grpId, 
Lease lease) {
   ```



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * A lease representation in memory.
+ * The real lease is stored in Meta storage.
+ */
+public class Lease implements Serializable {
+    /** The object is used when nothing holds the lease. */
+    public static Lease EMPTY_LEASE = new Lease(null, null, null, true);
+
+    /** A node that holds a lease until {@code stopLeas}. */
+    private final ClusterNode leaseholder;
+
+    /** The lease is accepted, when the holder knows about it and applies all 
related obligations. */
+    private final boolean accepted;
+
+    /** Lease start timestamp. The timestamp is assigned when the lease 
created and does not be changed when the lease is prolonged. */
+    private final HybridTimestamp startTime;
+
+    /** Timestamp to expiration the lease. */
+    private final HybridTimestamp expirationTime;
+
+    /**
+     * Creates a new lease.
+     *
+     * @param leaseholder Lease holder.
+     * @param startTime Start lease timestamp.
+     * @param leaseExpirationTime Lease expiration timestamp.
+     */
+    public Lease(ClusterNode leaseholder, HybridTimestamp startTime, 
HybridTimestamp leaseExpirationTime) {
+        this(leaseholder, startTime, leaseExpirationTime, false);
+    }
+
+    /**
+     * The constructor.
+     *
+     * @param leaseholder Lease holder.
+     * @param startTime Start lease timestamp.
+     * @param leaseExpirationTime Lease expiration timestamp.
+     * @param accepted The flag is true when the holder accepted the lease, 
the false otherwise.
+     */
+    private Lease(ClusterNode leaseholder, HybridTimestamp startTime, 
HybridTimestamp leaseExpirationTime, boolean accepted) {
+        this.leaseholder = leaseholder;
+        this.expirationTime = leaseExpirationTime;
+        this.startTime = startTime;
+        this.accepted = accepted;
+    }
+
+    /**
+     * Prolongs a lease to until a new timestamp. Only an accepted lease 
available to prolong.
+     *
+     * @param to The new lease expiration timestamp.
+     * @return A new lease which will have the same properties except expire 
timestamp.

Review Comment:
   ```suggestion
        * @return A new lease which will have the same properties except of 
expiration timestamp.
   ```



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -179,24 +240,32 @@ public void run() {
 
                     Lease lease = leaseTracker.getLease(grpId);
 
+                    if (!lease.isAccepted()) {
+                        LeaseAgreement agreement = 
leaseConciliator.conciliated(grpId);
+
+                        if (agreement.isAccepted()) {
+                            acceptLeasInMetaStorage(grpId, lease);
+                        }
+                    }

Review Comment:
   There should be a check that the agreement contains the same lease.



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -146,18 +172,53 @@ public void deactivate() {
 
             updaterTread = null;
         }
+
+        leaseTracker.unsubscribeLeaseAdded();
+
+        leaseConciliator = null;
     }
 
     /**
      * Finds a node that can be the leaseholder.
      *
+     * @param groupId Replication group id.
      * @param assignments Replication group assignment.
      * @return Cluster node, or {@code null} if no node in assignments can be 
the leaseholder.
      */
-    private ClusterNode nextLeaseHolder(Set<Assignment> assignments) {
+    private ClusterNode nextLeaseHolder(ReplicationGroupId groupId, 
Set<Assignment> assignments) {
         //TODO: IGNITE-18879 Implement more intellectual algorithm to choose a 
node.
+        LeaseAgreement agreement = leaseConciliator.conciliated(groupId);
+
+        if (agreement.getRedirectTo() != null) {
+            boolean hasInAssignments = false;
+
+            for (Assignment assignment : assignments) {
+                if 
(agreement.getRedirectTo().equals(assignment.consistentId())) {
+                    hasInAssignments = true;
+
+                    break;
+                }
+            }
+
+            if (hasInAssignments) {
+                ClusterNode candidate = 
topologyTracker.nodeByConsistentId(agreement.getRedirectTo());
+
+                if (candidate != null) {
+                    return candidate;
+                }
+            }
+        }

Review Comment:
   I strongly doubt that this should be handled here. I would mention the 
interface that I have suggested in 
https://issues.apache.org/jira/browse/IGNITE-18879 : on this stage we need only 
candidate for a group, without consideration of redirect proposal. Moreover, I 
see no reason to store intermediate state with redirect, we can only remember 
that we have chosen the candidate (and that candidate was saved to meta 
storage) and redirect proposal can be handled reactively.
   
   Additionally (and it is not only about this part of code) the implementation 
seems too complicated for me and definitely lacks reactiveness. We have updater 
scheduler, map with leases, map with lease agreements, metastorage update 
listeners that can change these maps, several intermediate states, etc. - we 
will have pretty long explanation just to describe all of the logic that is 
contained here. I suggest to think about how to simplify it.



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -231,6 +303,47 @@ private void updateLeaseInMetaStorage(ReplicationGroupId 
grpId, Lease lease, Clu
             );
         }
 
+        /**
+         * Writes a prolong lease in Meta storage.
+         *
+         * @param grpId Replication group id.
+         * @param lease Lease to prolong.
+         */
+        private void prolongLeasInMetaStorage(ReplicationGroupId grpId, Lease 
lease) {

Review Comment:
   ```suggestion
           private void prolongLeaseInMetaStorage(ReplicationGroupId grpId, 
Lease lease) {
   ```



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -242,7 +355,7 @@ private boolean isReplicationGroupUpdateLeaseholder(Lease 
lease, ClusterNode can
             HybridTimestamp now = clock.now();
 
             return lease == EMPTY_LEASE
-                    || (!candidate.equals(lease.getLeaseholder()) && 
now.after(lease.getLeaseExpirationTime()));
+                    || (!lease.isAccepted() && 
now.after(lease.getExpirationTime()));

Review Comment:
   Why `candidate` parameter is still needed?



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/conciliation/LeaseConciliator.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.conciliation;
+
+import static 
org.apache.ignite.internal.placementdriver.LeaseUpdater.LEASE_PERIOD;
+import static 
org.apache.ignite.internal.placementdriver.conciliation.LeaseAgreement.UNDEFINED_AGREEMENT;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.placementdriver.LeaseUpdater;
+import org.apache.ignite.internal.placementdriver.leases.Lease;
+import 
org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
+import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * This class conciliates a lease with leaseholder. If the lease is 
conciliated, it is ready available to accept.
+ */
+public class LeaseConciliator {

Review Comment:
   The word "conciliator" confused me a lot :) conciliation implies that there 
was some conflict between parties that needs to be resolved, I suggest to 
remove this word from everywhere as it has another meaning. In fact, this class 
just sends LeaseGrantMessage and holds a map, are you sure that it is actually 
needed? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to