saxenapranav commented on code in PR #6314:
URL: https://github.com/apache/hadoop/pull/6314#discussion_r1424996276


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -1411,6 +1448,97 @@ protected AccessTokenProvider getTokenProvider() {
     return tokenProvider;
   }
 
+  public AzureBlobFileSystem getMetricFilesystem() throws IOException {
+    if (metricFs == null) {
+      try {
+        Configuration metricConfig = abfsConfiguration.getRawConfiguration();
+        String metricAccountKey = 
metricConfig.get(FS_AZURE_METRIC_ACCOUNT_KEY);
+        final String abfsMetricUrl = metricConfig.get(FS_AZURE_METRIC_URI);
+        if (abfsMetricUrl == null) {
+          return null;
+        }
+        metricConfig.set(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, metricAccountKey);
+        metricConfig.set(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, 
"false");
+        URI metricUri;
+        metricUri = new URI(FileSystemUriSchemes.ABFS_SCHEME, abfsMetricUrl, 
null, null, null);
+        metricFs = (AzureBlobFileSystem) FileSystem.newInstance(metricUri, 
metricConfig);
+      } catch (AzureBlobFileSystemException | URISyntaxException ex) {
+        //do nothing
+      }
+    }
+    return metricFs;
+  }
+
+  private TracingContext getMetricTracingContext() {
+    String hostName;
+    try {
+      hostName = InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException e) {
+      hostName = "UnknownHost";
+    }
+    return new TracingContext(TracingContext.validateClientCorrelationID(
+        abfsConfiguration.getClientCorrelationId()),
+        hostName, FSOperationType.GET_ATTR, true,
+        abfsConfiguration.getTracingHeaderFormat(),
+        null, abfsCounters.toString());
+  }
+
+  /**
+   * Synchronized method to suspend or resume timer.
+   * @param timerFunctionality resume or suspend.
+   * @param timerTask The timertask object.
+   * @return true or false.
+   */
+  synchronized boolean timerOrchestrator(TimerFunctionality timerFunctionality,
+      TimerTask timerTask) {
+    switch (timerFunctionality) {
+    case RESUME:
+      if (metricCollectionStopped.get()) {
+        resumeTimer();
+      }
+      break;
+    case SUSPEND:
+      long now = System.currentTimeMillis();
+      long lastExecutionTime = abfsCounters.getLastExecutionTime().get();
+      if (metricCollectionEnabled && (now - lastExecutionTime >= 
metricAnalysisPeriod)) {
+        timerTask.cancel();
+        timer.purge();
+        metricCollectionStopped.set(true);
+        return true;
+      }
+      break;
+    default:
+      break;
+    }
+    return false;
+  }
+
+  private void resumeTimer() {
+    metricCollectionStopped.set(false);
+    timer.schedule(new TimerTaskImpl(),
+        metricIdlePeriod,
+        metricIdlePeriod);
+  }
+
+  class TimerTaskImpl extends TimerTask {

Review Comment:
   when client is closed, lets shutdown this thread (as it will not be GC'ed 
automatically).



##########
hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml:
##########
@@ -44,6 +44,16 @@
 <suppressions>
     <suppress checks="ParameterNumber|MagicNumber"
               
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]AzureBlobFileSystemStore.java"/>
+    <suppress checks="MagicNumber"

Review Comment:
   lets use the constants, that should help it.



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java:
##########
@@ -188,7 +187,7 @@ private void testClientRequestIdForStatusRetry(int status,
     int[] statusCount = new int[1];
     statusCount[0] = 0;
     Mockito.doAnswer(answer -> {
-      if (statusCount[0] <= 5) {
+      if (statusCount[0] <= 10) {

Review Comment:
   why is it changing.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -1411,6 +1448,97 @@ protected AccessTokenProvider getTokenProvider() {
     return tokenProvider;
   }
 
+  public AzureBlobFileSystem getMetricFilesystem() throws IOException {
+    if (metricFs == null) {
+      try {
+        Configuration metricConfig = abfsConfiguration.getRawConfiguration();
+        String metricAccountKey = 
metricConfig.get(FS_AZURE_METRIC_ACCOUNT_KEY);
+        final String abfsMetricUrl = metricConfig.get(FS_AZURE_METRIC_URI);
+        if (abfsMetricUrl == null) {
+          return null;
+        }
+        metricConfig.set(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, metricAccountKey);
+        metricConfig.set(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, 
"false");
+        URI metricUri;
+        metricUri = new URI(FileSystemUriSchemes.ABFS_SCHEME, abfsMetricUrl, 
null, null, null);
+        metricFs = (AzureBlobFileSystem) FileSystem.newInstance(metricUri, 
metricConfig);

Review Comment:
   lets ensure we dont create chain of metricFs, as in:
   FS -> metricFs -> metricFSClient -> metricFSClient_metricFS -> ...



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadFooterMetrics.java:
##########
@@ -0,0 +1,268 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_TRACINGMETRICHEADER_FORMAT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
+
+import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
+import org.junit.Test;
+
+import java.util.Random;
+import org.apache.hadoop.fs.azurebfs.services.AbfsReadFooterMetrics;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
+import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+
+public class ITestAbfsReadFooterMetrics extends AbstractAbfsScaleTest {
+
+  public ITestAbfsReadFooterMetrics() throws Exception {
+  }
+
+  private static final String TEST_PATH = "/testfile";
+
+  @Test
+  public void testReadFooterMetricsWithParquetAndNonParquet() throws Exception 
{
+    testReadWriteAndSeek(8 * ONE_MB, DEFAULT_READ_BUFFER_SIZE, ONE_KB, 4 * 
ONE_KB);
+  }
+
+  @Test
+  public void testReadFooterMetrics() throws Exception {
+    int bufferSize = MIN_BUFFER_SIZE;
+    final AzureBlobFileSystem fs = getFileSystem();
+    final AbfsConfiguration abfsConfiguration = fs.getAbfsStore()
+        .getAbfsConfiguration();
+    abfsConfiguration.set(FS_AZURE_METRIC_FORMAT, 
String.valueOf(MetricFormat.INTERNAL_FOOTER_METRIC_FORMAT));
+    abfsConfiguration.set(FS_AZURE_TRACINGMETRICHEADER_FORMAT,
+        String.valueOf(TracingHeaderFormat.INTERNAL_FOOTER_METRIC_FORMAT));
+    abfsConfiguration.setWriteBufferSize(bufferSize);
+    abfsConfiguration.setReadBufferSize(bufferSize);
+
+    final byte[] b = new byte[2 * bufferSize];
+    new Random().nextBytes(b);
+
+    Path testPath = path(TEST_PATH);
+    FSDataOutputStream stream = fs.create(testPath);
+    try {
+      stream.write(b);
+    } finally {
+      stream.close();
+    }
+    IOStatisticsLogging.logIOStatisticsAtLevel(LOG,
+        IOSTATISTICS_LOGGING_LEVEL_INFO, stream);
+
+    final byte[] readBuffer = new byte[2 * bufferSize];
+    int result;
+    IOStatisticsSource statisticsSource = null;
+    try (FSDataInputStream inputStream = fs.open(testPath)) {
+      statisticsSource = inputStream;
+      ((AbfsInputStream) inputStream.getWrappedStream()).registerListener(
+          new 
TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
+              fs.getFileSystemId(), FSOperationType.READ, true, 0,
+              ((AbfsInputStream) inputStream.getWrappedStream())
+                  .getStreamID()));
+      inputStream.seek(bufferSize);
+      result = inputStream.read(readBuffer, bufferSize, bufferSize);
+      assertNotEquals(-1, result);
+
+      //to test tracingHeader for case with bypassReadAhead == true
+      inputStream.seek(0);
+      byte[] temp = new byte[5];
+      int t = inputStream.read(temp, 0, 1);
+
+      inputStream.seek(0);
+      result = inputStream.read(readBuffer, 0, bufferSize);
+    }
+    IOStatisticsLogging.logIOStatisticsAtLevel(LOG,
+        IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource);
+
+    assertNotEquals("data read in final read()", -1, result);
+    assertArrayEquals(readBuffer, b);
+    AbfsReadFooterMetrics abfsReadFooterMetrics = fs.getAbfsClient()
+        .getAbfsCounters()
+        .getAbfsReadFooterMetrics();
+    String readFooterMetric = "";
+    if (abfsReadFooterMetrics != null) {
+      readFooterMetric = abfsReadFooterMetrics.toString();
+    }
+    assertEquals(
+        
"$NonParquet:$FR=16384.000_16384.000$SR=1.000_16384.000$FL=32768.000$RL=16384.000",

Review Comment:
   There is a grammar behind this string. Can we have a grammar ingestor which 
ingests values to create a string, and that string is asserted. Also, in the 
production code as well if can have the same grammar ingestor in the 
`toString()`. Benefit of that shall be that production code and test code 
string creation shall be in sync always.



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperation.java:
##########
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.junit.Test;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationType.DeletePath;
+import java.lang.reflect.Method;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import java.util.ArrayList;
+import java.util.Arrays;
+import org.junit.Assert;
+import java.net.HttpURLConnection;
+
+public class TestAbfsRestOperation extends
+    AbstractAbfsIntegrationTest {
+
+  public TestAbfsRestOperation() throws Exception {
+  }
+
+  @Test
+  public void testBackoffRetryMetrics() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    AbfsClient testClient = 
super.getAbfsClient(super.getAbfsStore(getFileSystem()));
+
+    // Mock instance of AbfsRestOperation
+    AbfsRestOperation op = ITestAbfsClient.getRestOp(
+        DeletePath, testClient, HTTP_METHOD_DELETE,
+       ITestAbfsClient.getTestUrl(testClient, "/NonExistingPath"), 
ITestAbfsClient.getTestRequestHeaders(testClient));
+
+    ArrayList<Integer> retryCounts = new ArrayList<>(Arrays.asList(35, 28, 31, 
45, 10, 2, 9));
+    int statusCode = HttpURLConnection.HTTP_UNAVAILABLE;
+    Method getMetrics = 
AbfsRestOperation.class.getDeclaredMethod("updateBackoffMetrics", int.class, 
int.class);

Review Comment:
   Can we prevent reflect API, and somehow get this method called in a valid 
flow.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java:
##########
@@ -63,6 +67,12 @@ public class AbfsCountersImpl implements AbfsCounters {
 
   private final IOStatisticsStore ioStatisticsStore;
 
+  private AtomicReference<AbfsBackoffMetrics> abfsBackoffMetrics = null;
+
+  private AtomicReference<AbfsReadFooterMetrics> abfsReadFooterMetrics = null;

Review Comment:
   Why are we having it in AtomicReference wrapper. Reason being, for outside 
world, they are visible only by doing atomicReference.get().



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -1411,6 +1448,97 @@ protected AccessTokenProvider getTokenProvider() {
     return tokenProvider;
   }
 
+  public AzureBlobFileSystem getMetricFilesystem() throws IOException {
+    if (metricFs == null) {
+      try {
+        Configuration metricConfig = abfsConfiguration.getRawConfiguration();
+        String metricAccountKey = 
metricConfig.get(FS_AZURE_METRIC_ACCOUNT_KEY);
+        final String abfsMetricUrl = metricConfig.get(FS_AZURE_METRIC_URI);
+        if (abfsMetricUrl == null) {
+          return null;
+        }
+        metricConfig.set(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, metricAccountKey);
+        metricConfig.set(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, 
"false");
+        URI metricUri;
+        metricUri = new URI(FileSystemUriSchemes.ABFS_SCHEME, abfsMetricUrl, 
null, null, null);
+        metricFs = (AzureBlobFileSystem) FileSystem.newInstance(metricUri, 
metricConfig);
+      } catch (AzureBlobFileSystemException | URISyntaxException ex) {
+        //do nothing
+      }
+    }
+    return metricFs;
+  }
+
+  private TracingContext getMetricTracingContext() {
+    String hostName;
+    try {
+      hostName = InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException e) {
+      hostName = "UnknownHost";
+    }
+    return new TracingContext(TracingContext.validateClientCorrelationID(
+        abfsConfiguration.getClientCorrelationId()),
+        hostName, FSOperationType.GET_ATTR, true,
+        abfsConfiguration.getTracingHeaderFormat(),
+        null, abfsCounters.toString());
+  }
+
+  /**
+   * Synchronized method to suspend or resume timer.
+   * @param timerFunctionality resume or suspend.
+   * @param timerTask The timertask object.
+   * @return true or false.
+   */
+  synchronized boolean timerOrchestrator(TimerFunctionality timerFunctionality,

Review Comment:
   We can have the method non-synchronized, and the core parts of the method 
synchronized. Reason being, core-part are   something which will occasionally 
get invoked and not always (tight if conditions with timings). Benefit shall be 
we will not slow system when its not required.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java:
##########
@@ -716,6 +723,10 @@ public synchronized void close() throws IOException {
     }
   }
 
+  public void sendMetric(TracingContext tracingContextMetric) throws 
AzureBlobFileSystemException {

Review Comment:
   lets have it private; Dont think there should be flow from client to fs -> 
there should not be a fs object in client.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadFooterMetrics.java:
##########
@@ -0,0 +1,453 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.Map;
+import java.util.StringJoiner;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.List;
+import java.util.ArrayList;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+
+public class AbfsReadFooterMetrics {
+  private final AtomicBoolean isParquetFile;
+  private final AtomicBoolean isParquetEvaluated;
+  private final AtomicBoolean isLenUpdated;
+  private String sizeReadByFirstRead;
+  private String offsetDiffBetweenFirstAndSecondRead;
+  private final AtomicLong fileLength;
+  private double avgFileLength;
+  private double avgReadLenRequested;
+  private final AtomicBoolean collectMetrics;
+  private final AtomicBoolean collectMetricsForNextRead;
+  private final AtomicBoolean collectLenMetrics;
+  private final AtomicLong dataLenRequested;
+  private final AtomicLong offsetOfFirstRead;
+  private final AtomicInteger readCount;
+  private final Map<String, AbfsReadFooterMetrics> metricsMap;
+
+  public AbfsReadFooterMetrics() {
+    this.isParquetFile = new AtomicBoolean(false);
+    this.isParquetEvaluated = new AtomicBoolean(false);
+    this.isLenUpdated = new AtomicBoolean(false);
+    this.fileLength = new AtomicLong();
+    this.readCount = new AtomicInteger(0);
+    this.offsetOfFirstRead = new AtomicLong();
+    this.collectMetrics = new AtomicBoolean(false);
+    this.collectMetricsForNextRead = new AtomicBoolean(false);
+    this.collectLenMetrics = new AtomicBoolean(false);
+    this.dataLenRequested = new AtomicLong(0);
+    this.metricsMap = new ConcurrentHashMap<>();
+  }
+
+  public Map<String, AbfsReadFooterMetrics> getMetricsMap() {
+    return metricsMap;
+  }
+
+  private AtomicBoolean getIsParquetFile() {
+    return isParquetFile;
+  }
+
+  private String getSizeReadByFirstRead() {
+    return sizeReadByFirstRead;
+  }
+
+  private void setSizeReadByFirstRead(final String sizeReadByFirstRead) {
+    this.sizeReadByFirstRead = sizeReadByFirstRead;
+  }
+
+  private String getOffsetDiffBetweenFirstAndSecondRead() {
+    return offsetDiffBetweenFirstAndSecondRead;
+  }
+
+  private void setOffsetDiffBetweenFirstAndSecondRead(final String 
offsetDiffBetweenFirstAndSecondRead) {
+    this.offsetDiffBetweenFirstAndSecondRead
+        = offsetDiffBetweenFirstAndSecondRead;
+  }
+
+  private AtomicLong getFileLength() {
+    return fileLength;
+  }
+
+  private double getAvgFileLength() {
+    return avgFileLength;
+  }
+
+  private void setAvgFileLength(final double avgFileLength) {
+    this.avgFileLength = avgFileLength;
+  }
+
+  private double getAvgReadLenRequested() {
+    return avgReadLenRequested;
+  }
+
+  private void setAvgReadLenRequested(final double avgReadLenRequested) {
+    this.avgReadLenRequested = avgReadLenRequested;
+  }
+
+  private AtomicBoolean getCollectMetricsForNextRead() {
+    return collectMetricsForNextRead;
+  }
+
+  private AtomicLong getOffsetOfFirstRead() {
+    return offsetOfFirstRead;
+  }
+
+  private AtomicInteger getReadCount() {
+    return readCount;
+  }
+
+  private AtomicBoolean getCollectLenMetrics() {
+    return collectLenMetrics;
+  }
+
+  private AtomicLong getDataLenRequested() {
+    return dataLenRequested;
+  }
+
+  private AtomicBoolean getCollectMetrics() {
+    return collectMetrics;
+  }
+
+  private AtomicBoolean getIsParquetEvaluated() {
+    return isParquetEvaluated;
+  }
+
+  private AtomicBoolean getIsLenUpdated() {
+    return isLenUpdated;
+  }
+
+  /**
+   * Updates the metrics map with an entry for the specified file if it 
doesn't already exist.
+   *
+   * @param filePathIdentifier The unique identifier for the file.
+   */
+  public void updateMap(String filePathIdentifier) {
+    // If the file is not already in the metrics map, add it with a new 
AbfsReadFooterMetrics object.
+    metricsMap.computeIfAbsent(filePathIdentifier, key -> new 
AbfsReadFooterMetrics());
+  }
+
+  /**
+   * Checks and updates metrics for a specific file identified by 
filePathIdentifier.
+   * If the metrics do not exist for the file, they are initialized.
+   *
+   * @param filePathIdentifier The unique identifier for the file.
+   * @param len               The length of the read operation.
+   * @param contentLength     The total content length of the file.
+   * @param nextReadPos       The position of the next read operation.
+   */
+  public void checkMetricUpdate(final String filePathIdentifier, final int 
len, final long contentLength,
+      final long nextReadPos) {
+    AbfsReadFooterMetrics readFooterMetrics = metricsMap.computeIfAbsent(
+            filePathIdentifier, key -> new AbfsReadFooterMetrics());
+    if (readFooterMetrics.getReadCount().get() == 0
+        || (readFooterMetrics.getReadCount().get() >= 1
+        && readFooterMetrics.getCollectMetrics().get())) {
+      updateMetrics(filePathIdentifier, len, contentLength, nextReadPos);
+    }
+  }
+
+  /**
+   * Updates metrics for a specific file identified by filePathIdentifier.
+   *
+   * @param filePathIdentifier  The unique identifier for the file.
+   * @param len                The length of the read operation.
+   * @param contentLength      The total content length of the file.
+   * @param nextReadPos        The position of the next read operation.
+   */
+  private void updateMetrics(final String filePathIdentifier, final int len, 
final long contentLength,
+                             final long nextReadPos) {
+    AbfsReadFooterMetrics readFooterMetrics = 
metricsMap.get(filePathIdentifier);
+
+    // Create a new AbfsReadFooterMetrics object if it doesn't exist in the 
metricsMap.
+    if (readFooterMetrics == null) {
+      readFooterMetrics = new AbfsReadFooterMetrics();
+      metricsMap.put(filePathIdentifier, readFooterMetrics);
+    }
+
+    int readCount = readFooterMetrics.getReadCount().incrementAndGet();
+
+    if (readCount == 1) {
+      // Update metrics for the first read.
+      updateMetricsOnFirstRead(readFooterMetrics, nextReadPos, len, 
contentLength);
+    }
+
+    if (readFooterMetrics.getCollectLenMetrics().get()) {
+      readFooterMetrics.getDataLenRequested().getAndAdd(len);
+    }
+
+    if (readCount == 2) {
+      // Update metrics for the second read.
+      updateMetricsOnSecondRead(readFooterMetrics, nextReadPos, len);
+    }
+  }
+
+  /**
+   * Updates metrics for the first read operation.
+   *
+   * @param readFooterMetrics The metrics object to update.
+   * @param nextReadPos       The position of the next read operation.
+   * @param len               The length of the read operation.
+   * @param contentLength     The total content length of the file.
+   */
+  private void updateMetricsOnFirstRead(AbfsReadFooterMetrics 
readFooterMetrics, long nextReadPos, int len, long contentLength) {
+    if (nextReadPos >= contentLength - 20 * ONE_KB) {
+      readFooterMetrics.getCollectMetrics().set(true);
+      readFooterMetrics.getCollectMetricsForNextRead().set(true);
+      readFooterMetrics.getOffsetOfFirstRead().set(nextReadPos);
+      readFooterMetrics.setSizeReadByFirstRead(len + "_" + 
Math.abs(contentLength - nextReadPos));
+      readFooterMetrics.getFileLength().set(contentLength);
+    }
+  }
+
+  /**
+   * Updates metrics for the second read operation.
+   *
+   * @param readFooterMetrics The metrics object to update.
+   * @param nextReadPos       The position of the next read operation.
+   * @param len               The length of the read operation.
+   */
+  private void updateMetricsOnSecondRead(AbfsReadFooterMetrics 
readFooterMetrics, long nextReadPos, int len) {
+    if (readFooterMetrics.getCollectMetricsForNextRead().get()) {
+      long offsetDiff = Math.abs(nextReadPos - 
readFooterMetrics.getOffsetOfFirstRead().get());
+      readFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(len + "_" + 
offsetDiff);
+      readFooterMetrics.getCollectLenMetrics().set(true);
+    }
+  }
+
+
+  /**
+   * Check if the given metrics should be marked as a Parquet file.
+   *
+   * @param metrics The metrics to evaluate.
+   * @return True if the metrics meet the criteria for being marked as a 
Parquet file, false otherwise.
+   */
+  private boolean shouldMarkAsParquet(AbfsReadFooterMetrics metrics) {
+    return metrics.getCollectMetrics().get()
+            && metrics.getReadCount().get() >= 2
+            && !metrics.getIsParquetEvaluated().get()
+            && haveEqualValues(metrics.getSizeReadByFirstRead())
+            && 
haveEqualValues(metrics.getOffsetDiffBetweenFirstAndSecondRead());
+  }
+
+  /**
+   * Check if two values are equal, considering they are in the format 
"value1_value2".
+   *
+   * @param value The value to check.
+   * @return True if the two parts of the value are equal, false otherwise.
+   */
+  private boolean haveEqualValues(String value) {
+    String[] parts = value.split("_");
+    return parts.length == 2 && parts[0].equals(parts[1]);
+  }
+
+  /**
+   * Mark the given metrics as a Parquet file and update related values.
+   *
+   * @param metrics The metrics to mark as Parquet.
+   */
+  private void markAsParquet(AbfsReadFooterMetrics metrics) {
+    metrics.getIsParquetFile().set(true);
+    String[] parts = metrics.getSizeReadByFirstRead().split("_");
+    metrics.setSizeReadByFirstRead(parts[0]);
+    parts = metrics.getOffsetDiffBetweenFirstAndSecondRead().split("_");
+    metrics.setOffsetDiffBetweenFirstAndSecondRead(parts[0]);
+    metrics.getIsParquetEvaluated().set(true);
+  }
+
+  /**
+   * Check each metric in the provided map and mark them as Parquet files if 
they meet the criteria.
+   *
+   * @param metricsMap The map containing metrics to evaluate.
+   */
+  public void checkIsParquet(Map<String, AbfsReadFooterMetrics> metricsMap) {
+    for (Map.Entry<String, AbfsReadFooterMetrics> entry : 
metricsMap.entrySet()) {
+      AbfsReadFooterMetrics readFooterMetrics = entry.getValue();
+      if (shouldMarkAsParquet(readFooterMetrics)) {
+        markAsParquet(readFooterMetrics);
+        metricsMap.replace(entry.getKey(), readFooterMetrics);
+      }
+    }
+  }
+
+  /**
+   * Updates the average read length requested for metrics of all files in the 
metrics map.
+   * If the metrics indicate that the update is needed, it calculates the 
average read length and updates the metrics.
+   *
+   * @param metricsMap A map containing metrics for different files with 
unique identifiers.
+   */
+  private void updateLenRequested(Map<String, AbfsReadFooterMetrics> 
metricsMap) {
+    for (Map.Entry<String, AbfsReadFooterMetrics> entry : 
metricsMap.entrySet()) {
+      AbfsReadFooterMetrics readFooterMetrics = entry.getValue();
+      if (readFooterMetrics.getCollectMetrics().get() && 
readFooterMetrics.getReadCount().get() > 2) {
+        if (!readFooterMetrics.getIsLenUpdated().get()) {
+          int readReqCount = readFooterMetrics.getReadCount().get() - 2;
+          readFooterMetrics.setAvgReadLenRequested(
+                  (double) readFooterMetrics.getDataLenRequested().get()
+                          / readReqCount);
+        }
+        readFooterMetrics.getIsLenUpdated().set(true);
+        metricsMap.replace(entry.getKey(), readFooterMetrics);
+      }
+    }
+  }
+
+  /**
+   * Calculates the average metrics from a list of AbfsReadFooterMetrics and 
sets the values in the provided 'avgParquetReadFooterMetrics' object.
+   *
+   * @param isParquetList The list of AbfsReadFooterMetrics to compute the 
averages from.
+   * @param avgParquetReadFooterMetrics The target AbfsReadFooterMetrics 
object to store the computed average values.
+   *
+   * This method calculates various average metrics from the provided list and 
sets them in the 'avgParquetReadFooterMetrics' object.
+   * The metrics include:
+   * - Size read by the first read
+   * - Offset difference between the first and second read
+   * - Average file length
+   * - Average requested read length
+   */
+  private void getParquetReadFooterMetricsAverage(List<AbfsReadFooterMetrics> 
isParquetList,
+      AbfsReadFooterMetrics avgParquetReadFooterMetrics){
+    avgParquetReadFooterMetrics.setSizeReadByFirstRead(
+        String.format("%.3f", isParquetList.stream()
+            .map(AbfsReadFooterMetrics::getSizeReadByFirstRead).mapToDouble(
+                Double::parseDouble).average().orElse(0.0)));
+    avgParquetReadFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(
+        String.format("%.3f", isParquetList.stream()
+            .map(AbfsReadFooterMetrics::getOffsetDiffBetweenFirstAndSecondRead)
+            .mapToDouble(Double::parseDouble).average().orElse(0.0)));
+    avgParquetReadFooterMetrics.setAvgFileLength(isParquetList.stream()
+        .map(AbfsReadFooterMetrics::getFileLength)
+        .mapToDouble(AtomicLong::get).average().orElse(0.0));
+    avgParquetReadFooterMetrics.setAvgReadLenRequested(isParquetList.stream().
+        map(AbfsReadFooterMetrics::getAvgReadLenRequested).
+        mapToDouble(Double::doubleValue).average().orElse(0.0));
+  }
+
+  private void 
getNonParquetReadFooterMetricsAverage(List<AbfsReadFooterMetrics> 
isNonParquetList, AbfsReadFooterMetrics avgNonParquetReadFooterMetrics) {
+    int size = 
isNonParquetList.get(0).getSizeReadByFirstRead().split("_").length;
+    double[] store = new double[2 * size];
+    // Calculating sum of individual values
+    isNonParquetList.forEach(abfsReadFooterMetrics -> {
+      String[] firstReadSize = 
abfsReadFooterMetrics.getSizeReadByFirstRead().split("_");
+      String[] offDiffFirstSecondRead = 
abfsReadFooterMetrics.getOffsetDiffBetweenFirstAndSecondRead().split("_");
+
+      for (int i = 0; i < firstReadSize.length; i++) {
+        store[i] += Long.parseLong(firstReadSize[i]);
+        store[i + size] += Long.parseLong(offDiffFirstSecondRead[i]);
+      }
+    });
+
+    // Calculating averages and creating formatted strings
+    StringJoiner firstReadSize = new StringJoiner("_");
+    StringJoiner offDiffFirstSecondRead = new StringJoiner("_");
+
+    for (int j = 0; j < size; j++) {
+      firstReadSize.add(String.format("%.3f", store[j] / 
isNonParquetList.size()));
+      offDiffFirstSecondRead.add(String.format("%.3f", store[j + size] / 
isNonParquetList.size()));
+    }
+
+    
avgNonParquetReadFooterMetrics.setSizeReadByFirstRead(firstReadSize.toString());
+    
avgNonParquetReadFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(offDiffFirstSecondRead.toString());
+    avgNonParquetReadFooterMetrics.setAvgFileLength(isNonParquetList.stream()
+            .mapToDouble(metrics -> 
metrics.getFileLength().doubleValue()).average().orElse(0.0));
+    
avgNonParquetReadFooterMetrics.setAvgReadLenRequested(isNonParquetList.stream()
+            
.mapToDouble(AbfsReadFooterMetrics::getAvgReadLenRequested).average().orElse(0.0));
+  }
+
+  /*
+  Acronyms:
+  1.FR :- First Read (In case of parquet we only maintain the size requested 
by application for
+  the first read, in case of non parquet we maintain a string separated by "_" 
delimiter where the first
+  substring represents the len requested for first read and the second 
substring represents the seek pointer difference from the
+  end of the file.)
+  2.SR :- Second Read (In case of parquet we only maintain the size requested 
by application for
+  the second read, in case of non parquet we maintain a string separated by 
"_" delimiter where the first
+  substring represents the len requested for second read and the second 
substring represents the seek pointer difference from the
+  offset of the first read.)
+  3.FL :- Total length of the file requested for read
+   */
+  private String getReadFooterMetrics(AbfsReadFooterMetrics 
avgReadFooterMetrics) {
+    String readFooterMetric = "";
+    if (avgReadFooterMetrics.getIsParquetFile().get()) {
+      readFooterMetric += "$Parquet:";
+    } else {
+      readFooterMetric += "$NonParquet:";
+    }
+    readFooterMetric += "$FR=" + avgReadFooterMetrics.getSizeReadByFirstRead()
+        + "$SR="
+        + avgReadFooterMetrics.getOffsetDiffBetweenFirstAndSecondRead()
+        + "$FL=" + String.format("%.3f",
+        avgReadFooterMetrics.getAvgFileLength())
+        + "$RL=" + String.format("%.3f",
+        avgReadFooterMetrics.getAvgReadLenRequested());
+    return readFooterMetric;
+  }
+
+  private String getFooterMetrics(List<AbfsReadFooterMetrics> 
readFooterMetricsList, String readFooterMetric){
+    List<AbfsReadFooterMetrics> isParquetList = new ArrayList<>();
+    List<AbfsReadFooterMetrics> isNonParquetList = new ArrayList<>();
+    for (AbfsReadFooterMetrics abfsReadFooterMetrics : readFooterMetricsList) {
+      if (abfsReadFooterMetrics.getIsParquetFile().get()) {
+        isParquetList.add(abfsReadFooterMetrics);
+      } else {
+        isNonParquetList.add(abfsReadFooterMetrics);
+      }
+    }
+    AbfsReadFooterMetrics avgParquetReadFooterMetrics = new 
AbfsReadFooterMetrics();
+    AbfsReadFooterMetrics avgNonparquetReadFooterMetrics = new 
AbfsReadFooterMetrics();
+
+    if (!isParquetList.isEmpty()){
+      avgParquetReadFooterMetrics.getIsParquetFile().set(true);
+      getParquetReadFooterMetricsAverage(isParquetList, 
avgParquetReadFooterMetrics);
+      readFooterMetric += getReadFooterMetrics(avgParquetReadFooterMetrics);
+    }
+    if (!isNonParquetList.isEmpty()) {
+      avgNonparquetReadFooterMetrics.getIsParquetFile().set(false);
+      getNonParquetReadFooterMetricsAverage(isNonParquetList, 
avgNonparquetReadFooterMetrics);
+      readFooterMetric += getReadFooterMetrics(avgNonparquetReadFooterMetrics);
+    }
+    return readFooterMetric + "";
+  }
+
+  @Override
+  public String toString() {
+    Map<String, AbfsReadFooterMetrics> metricsMap = getMetricsMap();
+    List<AbfsReadFooterMetrics> readFooterMetricsList = new ArrayList<>();
+    if (metricsMap != null && !(metricsMap.isEmpty())) {
+      checkIsParquet(metricsMap);
+      updateLenRequested(metricsMap);
+      for (Map.Entry<String, AbfsReadFooterMetrics> entry : 
metricsMap.entrySet()) //using map.entrySet() for iteration

Review Comment:
   `entrySet` may not give entries in same order always.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org


Reply via email to