This is an automated email from the ASF dual-hosted git repository. vpyatkov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new c97d7c5db3 IGNITE-21247 Log enhancements for LeaseUpdater (#3109) c97d7c5db3 is described below commit c97d7c5db327583b82eeca1165cb4eea21a3b3a2 Author: Denis Chudov <moongll...@gmail.com> AuthorDate: Wed Feb 7 12:13:06 2024 +0300 IGNITE-21247 Log enhancements for LeaseUpdater (#3109) --- .../internal/tostring/IgniteToStringBuilder.java | 52 +++++++++++- .../internal/placementdriver/LeaseUpdater.java | 99 +++++++++++++++++++++- .../internal/table/distributed/TableManager.java | 19 ++++- 3 files changed, 163 insertions(+), 7 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/tostring/IgniteToStringBuilder.java b/modules/core/src/main/java/org/apache/ignite/internal/tostring/IgniteToStringBuilder.java index 86a0e052db..6d8c88d6b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/tostring/IgniteToStringBuilder.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/tostring/IgniteToStringBuilder.java @@ -54,6 +54,7 @@ import java.util.function.Supplier; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.lang.IgniteStringBuilder; import org.apache.ignite.internal.lang.IgniteSystemProperties; +import org.apache.ignite.internal.lang.IgniteTriConsumer; import org.jetbrains.annotations.Nullable; /** @@ -1450,6 +1451,53 @@ public class IgniteToStringBuilder { } } + /** + * Produces a string representation of a list with customized string representation of elements. + * + * @param list List. + * @param elementToString Element to string transformer, accepts the string builder, element of list and its index. + * @param <T> Type of list elements. + * @return String. + */ + public static <T> String toString(List<T> list, IgniteTriConsumer<IgniteStringBuilder, T, Integer> elementToString) { + int listSize = list.size(); + + IgniteStringBuilder buf = new IgniteStringBuilder(); + + buf.app(" ["); + + int cnt = 0; + boolean needHandleOverflow = true; + + try { + for (int i = 0; i < list.size(); i++) { + if (i > 0) { + buf.app(','); + } + + T el = list.get(i); + + elementToString.accept(buf, el, i); + + if (++cnt == COLLECTION_LIMIT || cnt == listSize) { + break; + } + } + } catch (ConcurrentModificationException e) { + handleConcurrentModification(buf, cnt, listSize); + + needHandleOverflow = false; + } + + if (needHandleOverflow) { + handleOverflow(buf, listSize); + } + + buf.app(']'); + + return buf.toString(); + } + /** * Writes array to buffer. * @@ -1583,7 +1631,7 @@ public class IgniteToStringBuilder { * @param buf String builder buffer. * @param size Size to compare with limit. */ - private static void handleOverflow(StringBuilderLimitedLength buf, int size) { + private static void handleOverflow(IgniteStringBuilder buf, int size) { int overflow = size - COLLECTION_LIMIT; if (overflow > 0) { @@ -1598,7 +1646,7 @@ public class IgniteToStringBuilder { * @param writtenElements Number of elements successfully written to output. * @param size Overall size of collection. */ - private static void handleConcurrentModification(StringBuilderLimitedLength buf, int writtenElements, int size) { + private static void handleConcurrentModification(IgniteStringBuilder buf, int writtenElements, int size) { buf.app("... concurrent modification was detected, ").app(writtenElements).app(" out of ").app(size) .app(" were written"); } diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java index bf471c266c..6d0762659b 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java @@ -56,6 +56,8 @@ import org.apache.ignite.internal.placementdriver.negotiation.LeaseAgreement; import org.apache.ignite.internal.placementdriver.negotiation.LeaseNegotiator; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.thread.IgniteThread; +import org.apache.ignite.internal.tostring.IgniteToStringInclude; +import org.apache.ignite.internal.tostring.S; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.network.ClusterNode; import org.jetbrains.annotations.Nullable; @@ -64,6 +66,10 @@ import org.jetbrains.annotations.Nullable; * A processor to manger leases. The process is started when placement driver activates and stopped when it deactivates. */ public class LeaseUpdater { + /** Negative value means that printing statistics is disabled. */ + private static final int LEASE_UPDATE_STATISTICS_PRINT_ONCE_PER_ITERATIONS = IgniteSystemProperties + .getInteger("LEASE_STATISTICS_PRINT_ONCE_PER_ITERATIONS", 10); + /** Ignite logger. */ private static final IgniteLogger LOG = Loggers.forClass(LeaseUpdater.class); @@ -275,6 +281,11 @@ public class LeaseUpdater { /** Runnable to update lease in Meta storage. */ private class Updater implements Runnable { + private LeaseStats leaseUpdateStatistics = new LeaseStats(); + + /** This field should be accessed only from updater thread. */ + private int statisticsLogCounter; + @Override public void run() { while (active() && !Thread.interrupted()) { @@ -286,6 +297,13 @@ public class LeaseUpdater { if (active()) { updateLeaseBatchInternal(); } + } catch (Throwable e) { + LOG.error("Error occurred when updating the leases.", e); + + if (e instanceof Error) { + // TODO IGNITE-20368 The node should be halted in case of an error here. + throw (Error) e; + } } finally { stateChangingLock.leaveBusy(); } @@ -302,6 +320,8 @@ public class LeaseUpdater { private void updateLeaseBatchInternal() { HybridTimestamp now = clock.now(); + leaseUpdateStatistics = new LeaseStats(); + long outdatedLeaseThreshold = now.getPhysical() + LEASE_INTERVAL / 2; Leases leasesCurrent = leaseTracker.leasesCurrent(); @@ -315,11 +335,18 @@ public class LeaseUpdater { renewedLeases.entrySet().removeIf(e -> e.getValue().getExpirationTime().before(now) && !currentAssignmentsReplicationGroupIds.contains(e.getKey())); + int currentAssignmentsSize = currentAssignments.size(); + int activeLeasesCount = 0; + for (Map.Entry<ReplicationGroupId, Set<Assignment>> entry : currentAssignments.entrySet()) { ReplicationGroupId grpId = entry.getKey(); Lease lease = leaseTracker.getLease(grpId); + if (lease.isAccepted() && !isLeaseOutdated(lease)) { + activeLeasesCount++; + } + if (!lease.isAccepted()) { LeaseAgreement agreement = leaseNegotiator.negotiated(grpId); @@ -332,6 +359,8 @@ public class LeaseUpdater { ClusterNode candidate = nextLeaseHolder(entry.getValue(), agreement.getRedirectTo()); if (candidate == null) { + leaseUpdateStatistics.onLeaseWithoutCandidate(); + continue; } @@ -350,6 +379,8 @@ public class LeaseUpdater { ); if (candidate == null) { + leaseUpdateStatistics.onLeaseWithoutCandidate(); + continue; } @@ -368,7 +399,18 @@ public class LeaseUpdater { byte[] renewedValue = new LeaseBatch(renewedLeases.values()).bytes(); - var key = PLACEMENTDRIVER_LEASES_KEY; + ByteArray key = PLACEMENTDRIVER_LEASES_KEY; + + if (shouldLogLeaseStatistics()) { + LOG.info( + "Leases updated (printed once per {} iteration(s)): [inCurrentIteration={}, active={}, " + + "currentAssignmentsSize={}].", + LEASE_UPDATE_STATISTICS_PRINT_ONCE_PER_ITERATIONS, + leaseUpdateStatistics, + activeLeasesCount, + currentAssignmentsSize + ); + } msManager.invoke( or(notExists(key), value(key).eq(leasesCurrent.leasesBytes())), @@ -416,6 +458,8 @@ public class LeaseUpdater { renewedLeases.put(grpId, renewedLease); toBeNegotiated.put(grpId, !lease.isAccepted() && Objects.equals(lease.getLeaseholder(), candidate.name())); + + leaseUpdateStatistics.onLeaseCreate(); } /** @@ -430,6 +474,8 @@ public class LeaseUpdater { Lease renewedLease = lease.prolongLease(newTs); renewedLeases.put(grpId, renewedLease); + + leaseUpdateStatistics.onLeaseProlong(); } /** @@ -445,6 +491,8 @@ public class LeaseUpdater { Lease renewedLease = lease.acceptLease(newTs); renewedLeases.put(grpId, renewedLease); + + leaseUpdateStatistics.onLeasePublish(); } /** @@ -459,6 +507,55 @@ public class LeaseUpdater { return now.after(lease.getExpirationTime()); } + + private boolean shouldLogLeaseStatistics() { + if (LEASE_UPDATE_STATISTICS_PRINT_ONCE_PER_ITERATIONS < 0) { + return false; + } + + boolean result = ++statisticsLogCounter > LEASE_UPDATE_STATISTICS_PRINT_ONCE_PER_ITERATIONS; + + if (result) { + statisticsLogCounter = 0; + } + + return result; + } + } + + private static class LeaseStats { + @IgniteToStringInclude + int leasesCreated; + + @IgniteToStringInclude + int leasesPublished; + + @IgniteToStringInclude + int leasesProlonged; + + @IgniteToStringInclude + int leasesWithoutCandidates; + + private void onLeaseCreate() { + leasesCreated++; + } + + private void onLeasePublish() { + leasesPublished++; + } + + private void onLeaseProlong() { + leasesProlonged++; + } + + private void onLeaseWithoutCandidate() { + leasesWithoutCandidates++; + } + + @Override + public String toString() { + return S.toString(this); + } } /** Message handler to process notification from replica side. */ diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 46fd7f31ab..f3c58b4d49 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -170,6 +170,7 @@ import org.apache.ignite.internal.table.distributed.storage.PartitionStorages; import org.apache.ignite.internal.table.distributed.wrappers.ExecutorInclinedPlacementDriver; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.thread.StripedThreadPoolExecutor; +import org.apache.ignite.internal.tostring.S; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.LockManager; import org.apache.ignite.internal.tx.TxManager; @@ -652,8 +653,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { .invoke(condition, partitionAssignments, Collections.emptyList()) .thenCompose(invokeResult -> { if (invokeResult) { - LOG.info(IgniteStringFormatter.format("Assignments calculated from data nodes are successfully " - + "written to meta storage [tableId={}, assignments={}]", tableId, newAssignments)); + LOG.info(IgniteStringFormatter.format("Assignments calculated from data nodes are successfully written" + + " to meta storage [tableId={}, assignments={}]", tableId, assignmentListToString(newAssignments))); return completedFuture(newAssignments); } else { @@ -679,7 +680,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { } LOG.info(IgniteStringFormatter.format("Assignments picked up from meta storage [tableId={}, " - + "assignments={}]", tableId, realAssignments)); + + "assignments={}]", tableId, assignmentListToString(realAssignments))); return realAssignments; }); @@ -1167,7 +1168,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { assignmentsFuture.thenAccept(assignmentsList -> { LOG.info(IgniteStringFormatter.format("Assignments calculated from data nodes [table={}, tableId={}, assignments={}, " - + "revision={}]", tableDescriptor.name(), tableId, assignmentsList, causalityToken)); + + "revision={}]", tableDescriptor.name(), tableId, assignmentListToString(assignmentsList), causalityToken)); }); } @@ -1303,6 +1304,16 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { return createPartsFut.thenApply(ignore -> null); } + /** + * Creates a string representation of the given assignments list to use it for logging. + * + * @param assignments List of assignments. + * @return String representation of the given assignments list to use it for logging. + */ + private static String assignmentListToString(List<Set<Assignment>> assignments) { + return S.toString(assignments, (sb, e, i) -> sb.app(i).app('=').app(e)); + } + /** * Creates data storage for the provided table. *