bhattmanish98 commented on code in PR #8137: URL: https://github.com/apache/hadoop/pull/8137#discussion_r2672221769
########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsMetricsManager.java: ########## @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URL; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; +import org.apache.hadoop.fs.azurebfs.utils.MetricFormat; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.azurebfs.utils.UriUtils; + +import static org.apache.commons.lang3.StringUtils.isNotEmpty; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME; +import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.TOTAL_NUMBER_OF_REQUESTS; +import static org.apache.hadoop.fs.azurebfs.services.AbfsClient.LOG; + +/** + * AbfsMetricsManager is responsible for managing metrics collection + * and emission for an AbfsClient instance. + */ +public class AbfsMetricsManager implements Closeable { + + // Timer thread name for AbfsMetricsManager + public static final String ABFS_CLIENT_TIMER_THREAD_NAME + = "abfs-timer-client"; + + // Timer for scheduling metric emission tasks based on idle time + private Timer timer; + + // URL for sending metrics + private URL metricUrl; + + // Shared key credentials for metric account + private SharedKeyCredentials metricSharedkeyCredentials = null; + + // Currently running TimerTask + private TimerTask runningTimerTask; + + // Metric analysis periods + private final int metricAnalysisPeriod; + + // Metric idle period + private final int metricIdlePeriod; + + // Flag to indicate if a separate metric account is used + private boolean hasSeparateMetricAccount = false; + + // Flag to indicate if metric collection is enabled + private final AtomicBoolean isMetricCollectionEnabled + = new AtomicBoolean(false); + + // Metric format for metrics + private MetricFormat metricFormat; + + // Flag to indicate if metric collection is stopped + private final AtomicBoolean isMetricCollectionStopped; + + // AggregateMetricsManager instance + private final AggregateMetricsManager aggregateMetricsManager; + + // Scheduler to emit aggregated metric based on time + private ScheduledExecutorService metricsEmitScheduler = null; + + // AbfsConfiguration instance + private final AbfsConfiguration abfsConfiguration; + + // AbfsCounters instance + private final AbfsCounters abfsCounters; + + // File system ID + private final String fileSystemId; + + // Storage account name + private final String accountName; + + /** + * Constructor for AbfsMetricsManager. + * + * @param abfsConfiguration AbfsConfiguration object. + * @param abfsCounters AbfsCounters object. + * @param baseUrlString Base URL string of the AbfsClient. + * @param indexLastForwardSlash Index of last forward slash in the base URL string. + * @param accountName Storage account name. + * @param fileSystemId File system ID. + */ + public AbfsMetricsManager(final AbfsConfiguration abfsConfiguration, + final AbfsCounters abfsCounters, final String baseUrlString, + final int indexLastForwardSlash, final String accountName, + final String fileSystemId) { + this.abfsConfiguration = abfsConfiguration; + this.abfsCounters = abfsCounters; + this.fileSystemId = fileSystemId; + this.isMetricCollectionEnabled.set( + abfsConfiguration.isMetricsCollectionEnabled()); + this.isMetricCollectionStopped = new AtomicBoolean(false); + this.aggregateMetricsManager = AggregateMetricsManager.getInstance( + abfsConfiguration.getMetricsEmitIntervalInMins(), + abfsConfiguration.getMaxMetricsCallsPerSecond()); + this.metricAnalysisPeriod = abfsConfiguration.getMetricAnalysisTimeout(); + this.metricIdlePeriod = abfsConfiguration.getMetricIdleTimeout(); + this.accountName = accountName; + if (isMetricCollectionEnabled()) { + try { + String metricAccountName = abfsConfiguration.getMetricAccount(); + String metricAccountKey = abfsConfiguration.getMetricAccountKey(); + this.metricFormat = abfsConfiguration.getMetricFormat(); + if (isNotEmpty(metricAccountName) && isNotEmpty( + metricAccountKey)) { + int dotIndex = metricAccountName.indexOf(AbfsHttpConstants.DOT); + if (dotIndex <= 0) { + throw new InvalidUriException( + metricAccountName + " - account name is not fully qualified."); + } + try { + metricSharedkeyCredentials = new SharedKeyCredentials( + metricAccountName.substring(0, dotIndex), + metricAccountKey); + hasSeparateMetricAccount = true; + setMetricsUrl(metricAccountName.startsWith(HTTPS_SCHEME) + ? metricAccountName : HTTPS_SCHEME + COLON + + FORWARD_SLASH + FORWARD_SLASH + metricAccountName); + } catch (IllegalArgumentException e) { + throw new IOException( + "Exception while initializing metric credentials ", e); + } + } else { + setMetricsUrl(baseUrlString.substring(0, indexLastForwardSlash + 1)); + } + // Once the metric URL is set, initialize the metrics + abfsCounters.initializeMetrics(metricFormat, abfsConfiguration); + // Metrics emitter scheduler + this.metricsEmitScheduler + = Executors.newSingleThreadScheduledExecutor(); + // run every 1 minute to check the metrics count + this.metricsEmitScheduler.scheduleWithFixedDelay( + () -> { + if (abfsCounters.getAbfsBackoffMetrics() + .getMetricValue(TOTAL_NUMBER_OF_REQUESTS) + >= abfsConfiguration.getMetricsEmitThreshold()) { + emitCollectedMetrics(); + } + }, + abfsConfiguration.getMetricsEmitThresholdIntervalInSecs(), + abfsConfiguration.getMetricsEmitThresholdIntervalInSecs(), + TimeUnit.SECONDS); + + // run every metricInterval minutes + this.metricsEmitScheduler.scheduleWithFixedDelay( + this::emitCollectedMetrics, + abfsConfiguration.getMetricsEmitIntervalInMins(), + abfsConfiguration.getMetricsEmitIntervalInMins(), + TimeUnit.MINUTES); + + // emit metrics based on idea time + if (abfsConfiguration.shouldEmitMetricsOnIdleTime()) { + this.timer = new Timer( + ABFS_CLIENT_TIMER_THREAD_NAME, true); + timer.schedule(new TimerTaskImpl(), + metricIdlePeriod, + metricIdlePeriod); + } + } catch (Exception e) { + LOG.error("Metrics disabled. Failed to initialize metrics for {}", + baseUrlString, e); + this.isMetricCollectionEnabled.set(false); + } + } + } + + /** + * Closes the metrics resources. + * This method cancels any running timer tasks, shuts down the metrics emission scheduler, + * and emits any collected metrics before closing. + */ + @Override + public void close() { + if (runningTimerTask != null) { + runningTimerTask.cancel(); + } + if (timer != null) { Review Comment: timer.purge is redundant after timer.cancel Will add timer = null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
