This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c97d7c5db3 IGNITE-21247 Log enhancements for LeaseUpdater (#3109)
c97d7c5db3 is described below

commit c97d7c5db327583b82eeca1165cb4eea21a3b3a2
Author: Denis Chudov <moongll...@gmail.com>
AuthorDate: Wed Feb 7 12:13:06 2024 +0300

    IGNITE-21247 Log enhancements for LeaseUpdater (#3109)
---
 .../internal/tostring/IgniteToStringBuilder.java   | 52 +++++++++++-
 .../internal/placementdriver/LeaseUpdater.java     | 99 +++++++++++++++++++++-
 .../internal/table/distributed/TableManager.java   | 19 ++++-
 3 files changed, 163 insertions(+), 7 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/tostring/IgniteToStringBuilder.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/tostring/IgniteToStringBuilder.java
index 86a0e052db..6d8c88d6b7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/tostring/IgniteToStringBuilder.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/tostring/IgniteToStringBuilder.java
@@ -54,6 +54,7 @@ import java.util.function.Supplier;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.IgniteStringBuilder;
 import org.apache.ignite.internal.lang.IgniteSystemProperties;
+import org.apache.ignite.internal.lang.IgniteTriConsumer;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -1450,6 +1451,53 @@ public class IgniteToStringBuilder {
         }
     }
 
+    /**
+     * Produces a string representation of a list with customized string 
representation of elements.
+     *
+     * @param list List.
+     * @param elementToString Element to string transformer, accepts the 
string builder, element of list and its index.
+     * @param <T> Type of list elements.
+     * @return String.
+     */
+    public static <T> String toString(List<T> list, 
IgniteTriConsumer<IgniteStringBuilder, T, Integer> elementToString) {
+        int listSize = list.size();
+
+        IgniteStringBuilder buf = new IgniteStringBuilder();
+
+        buf.app(" [");
+
+        int cnt = 0;
+        boolean needHandleOverflow = true;
+
+        try {
+            for (int i = 0; i < list.size(); i++) {
+                if (i > 0) {
+                    buf.app(',');
+                }
+
+                T el = list.get(i);
+
+                elementToString.accept(buf, el, i);
+
+                if (++cnt == COLLECTION_LIMIT || cnt == listSize) {
+                    break;
+                }
+            }
+        } catch (ConcurrentModificationException e) {
+            handleConcurrentModification(buf, cnt, listSize);
+
+            needHandleOverflow = false;
+        }
+
+        if (needHandleOverflow) {
+            handleOverflow(buf, listSize);
+        }
+
+        buf.app(']');
+
+        return buf.toString();
+    }
+
     /**
      * Writes array to buffer.
      *
@@ -1583,7 +1631,7 @@ public class IgniteToStringBuilder {
      * @param buf  String builder buffer.
      * @param size Size to compare with limit.
      */
-    private static void handleOverflow(StringBuilderLimitedLength buf, int 
size) {
+    private static void handleOverflow(IgniteStringBuilder buf, int size) {
         int overflow = size - COLLECTION_LIMIT;
 
         if (overflow > 0) {
@@ -1598,7 +1646,7 @@ public class IgniteToStringBuilder {
      * @param writtenElements Number of elements successfully written to 
output.
      * @param size            Overall size of collection.
      */
-    private static void 
handleConcurrentModification(StringBuilderLimitedLength buf, int 
writtenElements, int size) {
+    private static void handleConcurrentModification(IgniteStringBuilder buf, 
int writtenElements, int size) {
         buf.app("... concurrent modification was detected, 
").app(writtenElements).app(" out of ").app(size)
                 .app(" were written");
     }
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index bf471c266c..6d0762659b 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -56,6 +56,8 @@ import 
org.apache.ignite.internal.placementdriver.negotiation.LeaseAgreement;
 import org.apache.ignite.internal.placementdriver.negotiation.LeaseNegotiator;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.thread.IgniteThread;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
@@ -64,6 +66,10 @@ import org.jetbrains.annotations.Nullable;
  * A processor to manger leases. The process is started when placement driver 
activates and stopped when it deactivates.
  */
 public class LeaseUpdater {
+    /** Negative value means that printing statistics is disabled. */
+    private static final int LEASE_UPDATE_STATISTICS_PRINT_ONCE_PER_ITERATIONS 
= IgniteSystemProperties
+            .getInteger("LEASE_STATISTICS_PRINT_ONCE_PER_ITERATIONS", 10);
+
     /** Ignite logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(LeaseUpdater.class);
 
@@ -275,6 +281,11 @@ public class LeaseUpdater {
 
     /** Runnable to update lease in Meta storage. */
     private class Updater implements Runnable {
+        private LeaseStats leaseUpdateStatistics = new LeaseStats();
+
+        /** This field should be accessed only from updater thread. */
+        private int statisticsLogCounter;
+
         @Override
         public void run() {
             while (active() && !Thread.interrupted()) {
@@ -286,6 +297,13 @@ public class LeaseUpdater {
                     if (active()) {
                         updateLeaseBatchInternal();
                     }
+                } catch (Throwable e) {
+                    LOG.error("Error occurred when updating the leases.", e);
+
+                    if (e instanceof Error) {
+                        // TODO IGNITE-20368 The node should be halted in case 
of an error here.
+                        throw (Error) e;
+                    }
                 } finally {
                     stateChangingLock.leaveBusy();
                 }
@@ -302,6 +320,8 @@ public class LeaseUpdater {
         private void updateLeaseBatchInternal() {
             HybridTimestamp now = clock.now();
 
+            leaseUpdateStatistics = new LeaseStats();
+
             long outdatedLeaseThreshold = now.getPhysical() + LEASE_INTERVAL / 
2;
 
             Leases leasesCurrent = leaseTracker.leasesCurrent();
@@ -315,11 +335,18 @@ public class LeaseUpdater {
             renewedLeases.entrySet().removeIf(e -> 
e.getValue().getExpirationTime().before(now)
                     && 
!currentAssignmentsReplicationGroupIds.contains(e.getKey()));
 
+            int currentAssignmentsSize = currentAssignments.size();
+            int activeLeasesCount = 0;
+
             for (Map.Entry<ReplicationGroupId, Set<Assignment>> entry : 
currentAssignments.entrySet()) {
                 ReplicationGroupId grpId = entry.getKey();
 
                 Lease lease = leaseTracker.getLease(grpId);
 
+                if (lease.isAccepted() && !isLeaseOutdated(lease)) {
+                    activeLeasesCount++;
+                }
+
                 if (!lease.isAccepted()) {
                     LeaseAgreement agreement = 
leaseNegotiator.negotiated(grpId);
 
@@ -332,6 +359,8 @@ public class LeaseUpdater {
                         ClusterNode candidate = 
nextLeaseHolder(entry.getValue(), agreement.getRedirectTo());
 
                         if (candidate == null) {
+                            leaseUpdateStatistics.onLeaseWithoutCandidate();
+
                             continue;
                         }
 
@@ -350,6 +379,8 @@ public class LeaseUpdater {
                     );
 
                     if (candidate == null) {
+                        leaseUpdateStatistics.onLeaseWithoutCandidate();
+
                         continue;
                     }
 
@@ -368,7 +399,18 @@ public class LeaseUpdater {
 
             byte[] renewedValue = new 
LeaseBatch(renewedLeases.values()).bytes();
 
-            var key = PLACEMENTDRIVER_LEASES_KEY;
+            ByteArray key = PLACEMENTDRIVER_LEASES_KEY;
+
+            if (shouldLogLeaseStatistics()) {
+                LOG.info(
+                        "Leases updated (printed once per {} iteration(s)): 
[inCurrentIteration={}, active={}, "
+                                + "currentAssignmentsSize={}].",
+                        LEASE_UPDATE_STATISTICS_PRINT_ONCE_PER_ITERATIONS,
+                        leaseUpdateStatistics,
+                        activeLeasesCount,
+                        currentAssignmentsSize
+                );
+            }
 
             msManager.invoke(
                     or(notExists(key), 
value(key).eq(leasesCurrent.leasesBytes())),
@@ -416,6 +458,8 @@ public class LeaseUpdater {
             renewedLeases.put(grpId, renewedLease);
 
             toBeNegotiated.put(grpId, !lease.isAccepted() && 
Objects.equals(lease.getLeaseholder(), candidate.name()));
+
+            leaseUpdateStatistics.onLeaseCreate();
         }
 
         /**
@@ -430,6 +474,8 @@ public class LeaseUpdater {
             Lease renewedLease = lease.prolongLease(newTs);
 
             renewedLeases.put(grpId, renewedLease);
+
+            leaseUpdateStatistics.onLeaseProlong();
         }
 
         /**
@@ -445,6 +491,8 @@ public class LeaseUpdater {
             Lease renewedLease = lease.acceptLease(newTs);
 
             renewedLeases.put(grpId, renewedLease);
+
+            leaseUpdateStatistics.onLeasePublish();
         }
 
         /**
@@ -459,6 +507,55 @@ public class LeaseUpdater {
 
             return now.after(lease.getExpirationTime());
         }
+
+        private boolean shouldLogLeaseStatistics() {
+            if (LEASE_UPDATE_STATISTICS_PRINT_ONCE_PER_ITERATIONS < 0) {
+                return false;
+            }
+
+            boolean result = ++statisticsLogCounter > 
LEASE_UPDATE_STATISTICS_PRINT_ONCE_PER_ITERATIONS;
+
+            if (result) {
+                statisticsLogCounter = 0;
+            }
+
+            return result;
+        }
+    }
+
+    private static class LeaseStats {
+        @IgniteToStringInclude
+        int leasesCreated;
+
+        @IgniteToStringInclude
+        int leasesPublished;
+
+        @IgniteToStringInclude
+        int leasesProlonged;
+
+        @IgniteToStringInclude
+        int leasesWithoutCandidates;
+
+        private void onLeaseCreate() {
+            leasesCreated++;
+        }
+
+        private void onLeasePublish() {
+            leasesPublished++;
+        }
+
+        private void onLeaseProlong() {
+            leasesProlonged++;
+        }
+
+        private void onLeaseWithoutCandidate() {
+            leasesWithoutCandidates++;
+        }
+
+        @Override
+        public String toString() {
+            return S.toString(this);
+        }
     }
 
     /** Message handler to process notification from replica side. */
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 46fd7f31ab..f3c58b4d49 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -170,6 +170,7 @@ import 
org.apache.ignite.internal.table.distributed.storage.PartitionStorages;
 import 
org.apache.ignite.internal.table.distributed.wrappers.ExecutorInclinedPlacementDriver;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
+import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.TxManager;
@@ -652,8 +653,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                     .invoke(condition, partitionAssignments, 
Collections.emptyList())
                     .thenCompose(invokeResult -> {
                         if (invokeResult) {
-                            LOG.info(IgniteStringFormatter.format("Assignments 
calculated from data nodes are successfully "
-                                            + "written to meta storage 
[tableId={}, assignments={}]", tableId, newAssignments));
+                            LOG.info(IgniteStringFormatter.format("Assignments 
calculated from data nodes are successfully written"
+                                    + " to meta storage [tableId={}, 
assignments={}]", tableId, assignmentListToString(newAssignments)));
 
                             return completedFuture(newAssignments);
                         } else {
@@ -679,7 +680,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                                 }
 
                                 
LOG.info(IgniteStringFormatter.format("Assignments picked up from meta storage 
[tableId={}, "
-                                        + "assignments={}]", tableId, 
realAssignments));
+                                        + "assignments={}]", tableId, 
assignmentListToString(realAssignments)));
 
                                 return realAssignments;
                             });
@@ -1167,7 +1168,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
                 assignmentsFuture.thenAccept(assignmentsList -> {
                     LOG.info(IgniteStringFormatter.format("Assignments 
calculated from data nodes [table={}, tableId={}, assignments={}, "
-                                    + "revision={}]", tableDescriptor.name(), 
tableId, assignmentsList, causalityToken));
+                            + "revision={}]", tableDescriptor.name(), tableId, 
assignmentListToString(assignmentsList), causalityToken));
                 });
             }
 
@@ -1303,6 +1304,16 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         return createPartsFut.thenApply(ignore -> null);
     }
 
+    /**
+     * Creates a string representation of the given assignments list to use it 
for logging.
+     *
+     * @param assignments List of assignments.
+     * @return String representation of the given assignments list to use it 
for logging.
+     */
+    private static String assignmentListToString(List<Set<Assignment>> 
assignments) {
+        return S.toString(assignments, (sb, e, i) -> 
sb.app(i).app('=').app(e));
+    }
+
     /**
      * Creates data storage for the provided table.
      *

Reply via email to