This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new dd81d60 Add trace and debug log to consistency check (#2583) dd81d60 is described below commit dd81d608bff88835676e756ffcb2a2f6b69c9be8 Author: Mike Miller <mmil...@apache.org> AuthorDate: Thu Mar 24 10:05:44 2022 -0400 Add trace and debug log to consistency check (#2583) * Closes #2577 * Add trace span and time measurement around consistency check so we get an idea of how long metadata scans are taking * Create new property tserver.health.check.interval to make it configurable * Create new method watchCriticalFixedDelay() in ThreadPools Co-authored-by: Keith Turner <ktur...@apache.org> --- .../org/apache/accumulo/core/conf/Property.java | 2 + .../accumulo/core/util/threads/ThreadPools.java | 7 ++ .../org/apache/accumulo/tserver/TabletServer.java | 80 +++++++++++++--------- 3 files changed, 57 insertions(+), 32 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 667d78b..06131bc 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -656,6 +656,8 @@ public enum Property { "1.4.0"), TSERV_BULK_TIMEOUT("tserver.bulk.timeout", "5m", PropertyType.TIMEDURATION, "The time to wait for a tablet server to process a bulk import request.", "1.4.3"), + TSERV_HEALTH_CHECK_FREQ("tserver.health.check.interval", "30m", PropertyType.TIMEDURATION, + "The time between tablet server health checks.", "2.1.0"), TSERV_MINTHREADS("tserver.server.threads.minimum", "20", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests.", "1.4.0"), TSERV_MINTHREADS_TIMEOUT("tserver.server.threads.timeout", "0s", PropertyType.TIMEDURATION, diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java index 73cb308..548852a 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java @@ -166,6 +166,13 @@ public class ThreadPools { CRITICAL_RUNNING_TASKS.add(future); } + public static void watchCriticalFixedDelay(AccumuloConfiguration aconf, long intervalMillis, + Runnable runnable) { + ScheduledFuture<?> future = getServerThreadPools().createGeneralScheduledExecutorService(aconf) + .scheduleWithFixedDelay(runnable, intervalMillis, intervalMillis, TimeUnit.MILLISECONDS); + CRITICAL_RUNNING_TASKS.add(future); + } + public static void watchNonCriticalScheduledTask(ScheduledFuture<?> future) { NON_CRITICAL_RUNNING_TASKS.add(future); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 3de8187..8c33967 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -23,12 +23,17 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; +import static org.apache.accumulo.core.util.threads.ThreadPools.watchCriticalFixedDelay; +import static org.apache.accumulo.core.util.threads.ThreadPools.watchCriticalScheduledTask; +import static org.apache.accumulo.core.util.threads.ThreadPools.watchNonCriticalScheduledTask; import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; import java.io.IOException; import java.lang.management.ManagementFactory; import java.net.UnknownHostException; import java.security.SecureRandom; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -164,6 +169,9 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterators; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Scope; + public class TabletServer extends AbstractServer { private static final SecureRandom random = new SecureRandom(); @@ -287,7 +295,7 @@ public class TabletServer extends AbstractServer { } } }), logBusyTabletsDelay, logBusyTabletsDelay, TimeUnit.MILLISECONDS); - ThreadPools.watchNonCriticalScheduledTask(future); + watchNonCriticalScheduledTask(future); } ScheduledFuture<?> future = context.getScheduledExecutor() @@ -304,7 +312,7 @@ public class TabletServer extends AbstractServer { } } }), 5, 5, TimeUnit.SECONDS); - ThreadPools.watchNonCriticalScheduledTask(future); + watchNonCriticalScheduledTask(future); @SuppressWarnings("deprecation") final long walMaxSize = @@ -352,7 +360,7 @@ public class TabletServer extends AbstractServer { this.resourceManager = new TabletServerResourceManager(context); this.security = AuditedSecurityOperation.getInstance(context); - ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay( + watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay( TabletLocator::clearLocators, jitter(), jitter(), TimeUnit.MILLISECONDS)); walMarker = new WalStateManager(context); @@ -803,38 +811,46 @@ public class TabletServer extends AbstractServer { } } }, 0, 5, TimeUnit.SECONDS); - ThreadPools.watchNonCriticalScheduledTask(future); + watchNonCriticalScheduledTask(future); - int tabletCheckFrequency = 30 + random.nextInt(31); // random 30-60 minute delay + long tabletCheckFrequency = aconf.getTimeInMillis(Property.TSERV_HEALTH_CHECK_FREQ); // Periodically check that metadata of tablets matches what is held in memory - ThreadPools.watchCriticalScheduledTask(ThreadPools.getServerThreadPools() - .createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(() -> { - final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot = onlineTablets.snapshot(); - - Map<KeyExtent,Long> updateCounts = new HashMap<>(); - - // gather updateCounts for each tablet - onlineTabletsSnapshot.forEach((ke, tablet) -> { - updateCounts.put(ke, tablet.getUpdateCount()); - }); - - // gather metadata for all tablets readTablets() - try (TabletsMetadata tabletsMetadata = - getContext().getAmple().readTablets().forTablets(onlineTabletsSnapshot.keySet()) - .fetch(FILES, LOGS, ECOMP, PREV_ROW).build()) { - - // for each tablet, compare its metadata to what is held in memory - tabletsMetadata.forEach(tabletMetadata -> { - KeyExtent extent = tabletMetadata.getExtent(); - Tablet tablet = onlineTabletsSnapshot.get(extent); - Long counter = updateCounts.get(extent); - tablet.compareTabletInfo(counter, tabletMetadata); - }); + watchCriticalFixedDelay(aconf, tabletCheckFrequency, () -> { + final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot = onlineTablets.snapshot(); + + Map<KeyExtent,Long> updateCounts = new HashMap<>(); + + // gather updateCounts for each tablet + onlineTabletsSnapshot.forEach((ke, tablet) -> { + updateCounts.put(ke, tablet.getUpdateCount()); + }); + + Instant start = Instant.now(); + Duration duration; + Span mdScanSpan = TraceUtil.startSpan(this.getClass(), "metadataScan"); + try (Scope scope = mdScanSpan.makeCurrent()) { + // gather metadata for all tablets readTablets() + try (TabletsMetadata tabletsMetadata = + getContext().getAmple().readTablets().forTablets(onlineTabletsSnapshot.keySet()) + .fetch(FILES, LOGS, ECOMP, PREV_ROW).build()) { + mdScanSpan.end(); + duration = Duration.between(start, Instant.now()); + log.debug("Metadata scan took {}ms for {} tablets read.", duration.toMillis(), + onlineTabletsSnapshot.keySet().size()); + + // for each tablet, compare its metadata to what is held in memory + for (var tabletMetadata : tabletsMetadata) { + KeyExtent extent = tabletMetadata.getExtent(); + Tablet tablet = onlineTabletsSnapshot.get(extent); + Long counter = updateCounts.get(extent); + tablet.compareTabletInfo(counter, tabletMetadata); } - }, tabletCheckFrequency, tabletCheckFrequency, TimeUnit.MINUTES)); + } + } + }); final long CLEANUP_BULK_LOADED_CACHE_MILLIS = TimeUnit.MINUTES.toMillis(15); - ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay( + watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay( new BulkImportCacheCleaner(this), CLEANUP_BULK_LOADED_CACHE_MILLIS, CLEANUP_BULK_LOADED_CACHE_MILLIS, TimeUnit.MILLISECONDS)); @@ -963,7 +979,7 @@ public class TabletServer extends AbstractServer { }; ScheduledFuture<?> future = context.getScheduledExecutor() .scheduleWithFixedDelay(replicationWorkThreadPoolResizer, 10, 30, TimeUnit.SECONDS); - ThreadPools.watchNonCriticalScheduledTask(future); + watchNonCriticalScheduledTask(future); } public String getClientAddressString() { @@ -1030,7 +1046,7 @@ public class TabletServer extends AbstractServer { ScheduledFuture<?> future = context.getScheduledExecutor().scheduleWithFixedDelay(gcDebugTask, 0, TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS); - ThreadPools.watchNonCriticalScheduledTask(future); + watchNonCriticalScheduledTask(future); } public TabletServerStatus getStats(Map<TableId,MapCounter<ScanRunState>> scanCounts) {