saxenapranav commented on code in PR #6314: URL: https://github.com/apache/hadoop/pull/6314#discussion_r1424996276
########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java: ########## @@ -1411,6 +1448,97 @@ protected AccessTokenProvider getTokenProvider() { return tokenProvider; } + public AzureBlobFileSystem getMetricFilesystem() throws IOException { + if (metricFs == null) { + try { + Configuration metricConfig = abfsConfiguration.getRawConfiguration(); + String metricAccountKey = metricConfig.get(FS_AZURE_METRIC_ACCOUNT_KEY); + final String abfsMetricUrl = metricConfig.get(FS_AZURE_METRIC_URI); + if (abfsMetricUrl == null) { + return null; + } + metricConfig.set(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, metricAccountKey); + metricConfig.set(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, "false"); + URI metricUri; + metricUri = new URI(FileSystemUriSchemes.ABFS_SCHEME, abfsMetricUrl, null, null, null); + metricFs = (AzureBlobFileSystem) FileSystem.newInstance(metricUri, metricConfig); + } catch (AzureBlobFileSystemException | URISyntaxException ex) { + //do nothing + } + } + return metricFs; + } + + private TracingContext getMetricTracingContext() { + String hostName; + try { + hostName = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + hostName = "UnknownHost"; + } + return new TracingContext(TracingContext.validateClientCorrelationID( + abfsConfiguration.getClientCorrelationId()), + hostName, FSOperationType.GET_ATTR, true, + abfsConfiguration.getTracingHeaderFormat(), + null, abfsCounters.toString()); + } + + /** + * Synchronized method to suspend or resume timer. + * @param timerFunctionality resume or suspend. + * @param timerTask The timertask object. + * @return true or false. + */ + synchronized boolean timerOrchestrator(TimerFunctionality timerFunctionality, + TimerTask timerTask) { + switch (timerFunctionality) { + case RESUME: + if (metricCollectionStopped.get()) { + resumeTimer(); + } + break; + case SUSPEND: + long now = System.currentTimeMillis(); + long lastExecutionTime = abfsCounters.getLastExecutionTime().get(); + if (metricCollectionEnabled && (now - lastExecutionTime >= metricAnalysisPeriod)) { + timerTask.cancel(); + timer.purge(); + metricCollectionStopped.set(true); + return true; + } + break; + default: + break; + } + return false; + } + + private void resumeTimer() { + metricCollectionStopped.set(false); + timer.schedule(new TimerTaskImpl(), + metricIdlePeriod, + metricIdlePeriod); + } + + class TimerTaskImpl extends TimerTask { Review Comment: when client is closed, lets shutdown this thread (as it will not be GC'ed automatically). ########## hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml: ########## @@ -44,6 +44,16 @@ <suppressions> <suppress checks="ParameterNumber|MagicNumber" files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]AzureBlobFileSystemStore.java"/> + <suppress checks="MagicNumber" Review Comment: lets use the constants, that should help it. ########## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java: ########## @@ -188,7 +187,7 @@ private void testClientRequestIdForStatusRetry(int status, int[] statusCount = new int[1]; statusCount[0] = 0; Mockito.doAnswer(answer -> { - if (statusCount[0] <= 5) { + if (statusCount[0] <= 10) { Review Comment: why is it changing. ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java: ########## @@ -1411,6 +1448,97 @@ protected AccessTokenProvider getTokenProvider() { return tokenProvider; } + public AzureBlobFileSystem getMetricFilesystem() throws IOException { + if (metricFs == null) { + try { + Configuration metricConfig = abfsConfiguration.getRawConfiguration(); + String metricAccountKey = metricConfig.get(FS_AZURE_METRIC_ACCOUNT_KEY); + final String abfsMetricUrl = metricConfig.get(FS_AZURE_METRIC_URI); + if (abfsMetricUrl == null) { + return null; + } + metricConfig.set(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, metricAccountKey); + metricConfig.set(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, "false"); + URI metricUri; + metricUri = new URI(FileSystemUriSchemes.ABFS_SCHEME, abfsMetricUrl, null, null, null); + metricFs = (AzureBlobFileSystem) FileSystem.newInstance(metricUri, metricConfig); Review Comment: lets ensure we dont create chain of metricFs, as in: FS -> metricFs -> metricFSClient -> metricFSClient_metricFS -> ... ########## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadFooterMetrics.java: ########## @@ -0,0 +1,268 @@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_TRACINGMETRICHEADER_FORMAT; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE; + +import org.apache.hadoop.fs.azurebfs.utils.MetricFormat; +import org.junit.Test; + +import java.util.Random; +import org.apache.hadoop.fs.azurebfs.services.AbfsReadFooterMetrics; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat; +import org.apache.hadoop.fs.statistics.IOStatisticsLogging; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator; +import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; + +public class ITestAbfsReadFooterMetrics extends AbstractAbfsScaleTest { + + public ITestAbfsReadFooterMetrics() throws Exception { + } + + private static final String TEST_PATH = "/testfile"; + + @Test + public void testReadFooterMetricsWithParquetAndNonParquet() throws Exception { + testReadWriteAndSeek(8 * ONE_MB, DEFAULT_READ_BUFFER_SIZE, ONE_KB, 4 * ONE_KB); + } + + @Test + public void testReadFooterMetrics() throws Exception { + int bufferSize = MIN_BUFFER_SIZE; + final AzureBlobFileSystem fs = getFileSystem(); + final AbfsConfiguration abfsConfiguration = fs.getAbfsStore() + .getAbfsConfiguration(); + abfsConfiguration.set(FS_AZURE_METRIC_FORMAT, String.valueOf(MetricFormat.INTERNAL_FOOTER_METRIC_FORMAT)); + abfsConfiguration.set(FS_AZURE_TRACINGMETRICHEADER_FORMAT, + String.valueOf(TracingHeaderFormat.INTERNAL_FOOTER_METRIC_FORMAT)); + abfsConfiguration.setWriteBufferSize(bufferSize); + abfsConfiguration.setReadBufferSize(bufferSize); + + final byte[] b = new byte[2 * bufferSize]; + new Random().nextBytes(b); + + Path testPath = path(TEST_PATH); + FSDataOutputStream stream = fs.create(testPath); + try { + stream.write(b); + } finally { + stream.close(); + } + IOStatisticsLogging.logIOStatisticsAtLevel(LOG, + IOSTATISTICS_LOGGING_LEVEL_INFO, stream); + + final byte[] readBuffer = new byte[2 * bufferSize]; + int result; + IOStatisticsSource statisticsSource = null; + try (FSDataInputStream inputStream = fs.open(testPath)) { + statisticsSource = inputStream; + ((AbfsInputStream) inputStream.getWrappedStream()).registerListener( + new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(), + fs.getFileSystemId(), FSOperationType.READ, true, 0, + ((AbfsInputStream) inputStream.getWrappedStream()) + .getStreamID())); + inputStream.seek(bufferSize); + result = inputStream.read(readBuffer, bufferSize, bufferSize); + assertNotEquals(-1, result); + + //to test tracingHeader for case with bypassReadAhead == true + inputStream.seek(0); + byte[] temp = new byte[5]; + int t = inputStream.read(temp, 0, 1); + + inputStream.seek(0); + result = inputStream.read(readBuffer, 0, bufferSize); + } + IOStatisticsLogging.logIOStatisticsAtLevel(LOG, + IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource); + + assertNotEquals("data read in final read()", -1, result); + assertArrayEquals(readBuffer, b); + AbfsReadFooterMetrics abfsReadFooterMetrics = fs.getAbfsClient() + .getAbfsCounters() + .getAbfsReadFooterMetrics(); + String readFooterMetric = ""; + if (abfsReadFooterMetrics != null) { + readFooterMetric = abfsReadFooterMetrics.toString(); + } + assertEquals( + "$NonParquet:$FR=16384.000_16384.000$SR=1.000_16384.000$FL=32768.000$RL=16384.000", Review Comment: There is a grammar behind this string. Can we have a grammar ingestor which ingests values to create a string, and that string is asserted. Also, in the production code as well if can have the same grammar ingestor in the `toString()`. Benefit of that shall be that production code and test code string creation shall be in sync always. ########## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperation.java: ########## @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import org.junit.Test; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE; +import static org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationType.DeletePath; +import java.lang.reflect.Method; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; +import java.util.ArrayList; +import java.util.Arrays; +import org.junit.Assert; +import java.net.HttpURLConnection; + +public class TestAbfsRestOperation extends + AbstractAbfsIntegrationTest { + + public TestAbfsRestOperation() throws Exception { + } + + @Test + public void testBackoffRetryMetrics() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + AbfsClient testClient = super.getAbfsClient(super.getAbfsStore(getFileSystem())); + + // Mock instance of AbfsRestOperation + AbfsRestOperation op = ITestAbfsClient.getRestOp( + DeletePath, testClient, HTTP_METHOD_DELETE, + ITestAbfsClient.getTestUrl(testClient, "/NonExistingPath"), ITestAbfsClient.getTestRequestHeaders(testClient)); + + ArrayList<Integer> retryCounts = new ArrayList<>(Arrays.asList(35, 28, 31, 45, 10, 2, 9)); + int statusCode = HttpURLConnection.HTTP_UNAVAILABLE; + Method getMetrics = AbfsRestOperation.class.getDeclaredMethod("updateBackoffMetrics", int.class, int.class); Review Comment: Can we prevent reflect API, and somehow get this method called in a valid flow. ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java: ########## @@ -63,6 +67,12 @@ public class AbfsCountersImpl implements AbfsCounters { private final IOStatisticsStore ioStatisticsStore; + private AtomicReference<AbfsBackoffMetrics> abfsBackoffMetrics = null; + + private AtomicReference<AbfsReadFooterMetrics> abfsReadFooterMetrics = null; Review Comment: Why are we having it in AtomicReference wrapper. Reason being, for outside world, they are visible only by doing atomicReference.get(). ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java: ########## @@ -1411,6 +1448,97 @@ protected AccessTokenProvider getTokenProvider() { return tokenProvider; } + public AzureBlobFileSystem getMetricFilesystem() throws IOException { + if (metricFs == null) { + try { + Configuration metricConfig = abfsConfiguration.getRawConfiguration(); + String metricAccountKey = metricConfig.get(FS_AZURE_METRIC_ACCOUNT_KEY); + final String abfsMetricUrl = metricConfig.get(FS_AZURE_METRIC_URI); + if (abfsMetricUrl == null) { + return null; + } + metricConfig.set(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, metricAccountKey); + metricConfig.set(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, "false"); + URI metricUri; + metricUri = new URI(FileSystemUriSchemes.ABFS_SCHEME, abfsMetricUrl, null, null, null); + metricFs = (AzureBlobFileSystem) FileSystem.newInstance(metricUri, metricConfig); + } catch (AzureBlobFileSystemException | URISyntaxException ex) { + //do nothing + } + } + return metricFs; + } + + private TracingContext getMetricTracingContext() { + String hostName; + try { + hostName = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + hostName = "UnknownHost"; + } + return new TracingContext(TracingContext.validateClientCorrelationID( + abfsConfiguration.getClientCorrelationId()), + hostName, FSOperationType.GET_ATTR, true, + abfsConfiguration.getTracingHeaderFormat(), + null, abfsCounters.toString()); + } + + /** + * Synchronized method to suspend or resume timer. + * @param timerFunctionality resume or suspend. + * @param timerTask The timertask object. + * @return true or false. + */ + synchronized boolean timerOrchestrator(TimerFunctionality timerFunctionality, Review Comment: We can have the method non-synchronized, and the core parts of the method synchronized. Reason being, core-part are something which will occasionally get invoked and not always (tight if conditions with timings). Benefit shall be we will not slow system when its not required. ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java: ########## @@ -716,6 +723,10 @@ public synchronized void close() throws IOException { } } + public void sendMetric(TracingContext tracingContextMetric) throws AzureBlobFileSystemException { Review Comment: lets have it private; Dont think there should be flow from client to fs -> there should not be a fs object in client. ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadFooterMetrics.java: ########## @@ -0,0 +1,453 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.services; + +import java.util.Map; +import java.util.StringJoiner; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.List; +import java.util.ArrayList; + +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB; + +public class AbfsReadFooterMetrics { + private final AtomicBoolean isParquetFile; + private final AtomicBoolean isParquetEvaluated; + private final AtomicBoolean isLenUpdated; + private String sizeReadByFirstRead; + private String offsetDiffBetweenFirstAndSecondRead; + private final AtomicLong fileLength; + private double avgFileLength; + private double avgReadLenRequested; + private final AtomicBoolean collectMetrics; + private final AtomicBoolean collectMetricsForNextRead; + private final AtomicBoolean collectLenMetrics; + private final AtomicLong dataLenRequested; + private final AtomicLong offsetOfFirstRead; + private final AtomicInteger readCount; + private final Map<String, AbfsReadFooterMetrics> metricsMap; + + public AbfsReadFooterMetrics() { + this.isParquetFile = new AtomicBoolean(false); + this.isParquetEvaluated = new AtomicBoolean(false); + this.isLenUpdated = new AtomicBoolean(false); + this.fileLength = new AtomicLong(); + this.readCount = new AtomicInteger(0); + this.offsetOfFirstRead = new AtomicLong(); + this.collectMetrics = new AtomicBoolean(false); + this.collectMetricsForNextRead = new AtomicBoolean(false); + this.collectLenMetrics = new AtomicBoolean(false); + this.dataLenRequested = new AtomicLong(0); + this.metricsMap = new ConcurrentHashMap<>(); + } + + public Map<String, AbfsReadFooterMetrics> getMetricsMap() { + return metricsMap; + } + + private AtomicBoolean getIsParquetFile() { + return isParquetFile; + } + + private String getSizeReadByFirstRead() { + return sizeReadByFirstRead; + } + + private void setSizeReadByFirstRead(final String sizeReadByFirstRead) { + this.sizeReadByFirstRead = sizeReadByFirstRead; + } + + private String getOffsetDiffBetweenFirstAndSecondRead() { + return offsetDiffBetweenFirstAndSecondRead; + } + + private void setOffsetDiffBetweenFirstAndSecondRead(final String offsetDiffBetweenFirstAndSecondRead) { + this.offsetDiffBetweenFirstAndSecondRead + = offsetDiffBetweenFirstAndSecondRead; + } + + private AtomicLong getFileLength() { + return fileLength; + } + + private double getAvgFileLength() { + return avgFileLength; + } + + private void setAvgFileLength(final double avgFileLength) { + this.avgFileLength = avgFileLength; + } + + private double getAvgReadLenRequested() { + return avgReadLenRequested; + } + + private void setAvgReadLenRequested(final double avgReadLenRequested) { + this.avgReadLenRequested = avgReadLenRequested; + } + + private AtomicBoolean getCollectMetricsForNextRead() { + return collectMetricsForNextRead; + } + + private AtomicLong getOffsetOfFirstRead() { + return offsetOfFirstRead; + } + + private AtomicInteger getReadCount() { + return readCount; + } + + private AtomicBoolean getCollectLenMetrics() { + return collectLenMetrics; + } + + private AtomicLong getDataLenRequested() { + return dataLenRequested; + } + + private AtomicBoolean getCollectMetrics() { + return collectMetrics; + } + + private AtomicBoolean getIsParquetEvaluated() { + return isParquetEvaluated; + } + + private AtomicBoolean getIsLenUpdated() { + return isLenUpdated; + } + + /** + * Updates the metrics map with an entry for the specified file if it doesn't already exist. + * + * @param filePathIdentifier The unique identifier for the file. + */ + public void updateMap(String filePathIdentifier) { + // If the file is not already in the metrics map, add it with a new AbfsReadFooterMetrics object. + metricsMap.computeIfAbsent(filePathIdentifier, key -> new AbfsReadFooterMetrics()); + } + + /** + * Checks and updates metrics for a specific file identified by filePathIdentifier. + * If the metrics do not exist for the file, they are initialized. + * + * @param filePathIdentifier The unique identifier for the file. + * @param len The length of the read operation. + * @param contentLength The total content length of the file. + * @param nextReadPos The position of the next read operation. + */ + public void checkMetricUpdate(final String filePathIdentifier, final int len, final long contentLength, + final long nextReadPos) { + AbfsReadFooterMetrics readFooterMetrics = metricsMap.computeIfAbsent( + filePathIdentifier, key -> new AbfsReadFooterMetrics()); + if (readFooterMetrics.getReadCount().get() == 0 + || (readFooterMetrics.getReadCount().get() >= 1 + && readFooterMetrics.getCollectMetrics().get())) { + updateMetrics(filePathIdentifier, len, contentLength, nextReadPos); + } + } + + /** + * Updates metrics for a specific file identified by filePathIdentifier. + * + * @param filePathIdentifier The unique identifier for the file. + * @param len The length of the read operation. + * @param contentLength The total content length of the file. + * @param nextReadPos The position of the next read operation. + */ + private void updateMetrics(final String filePathIdentifier, final int len, final long contentLength, + final long nextReadPos) { + AbfsReadFooterMetrics readFooterMetrics = metricsMap.get(filePathIdentifier); + + // Create a new AbfsReadFooterMetrics object if it doesn't exist in the metricsMap. + if (readFooterMetrics == null) { + readFooterMetrics = new AbfsReadFooterMetrics(); + metricsMap.put(filePathIdentifier, readFooterMetrics); + } + + int readCount = readFooterMetrics.getReadCount().incrementAndGet(); + + if (readCount == 1) { + // Update metrics for the first read. + updateMetricsOnFirstRead(readFooterMetrics, nextReadPos, len, contentLength); + } + + if (readFooterMetrics.getCollectLenMetrics().get()) { + readFooterMetrics.getDataLenRequested().getAndAdd(len); + } + + if (readCount == 2) { + // Update metrics for the second read. + updateMetricsOnSecondRead(readFooterMetrics, nextReadPos, len); + } + } + + /** + * Updates metrics for the first read operation. + * + * @param readFooterMetrics The metrics object to update. + * @param nextReadPos The position of the next read operation. + * @param len The length of the read operation. + * @param contentLength The total content length of the file. + */ + private void updateMetricsOnFirstRead(AbfsReadFooterMetrics readFooterMetrics, long nextReadPos, int len, long contentLength) { + if (nextReadPos >= contentLength - 20 * ONE_KB) { + readFooterMetrics.getCollectMetrics().set(true); + readFooterMetrics.getCollectMetricsForNextRead().set(true); + readFooterMetrics.getOffsetOfFirstRead().set(nextReadPos); + readFooterMetrics.setSizeReadByFirstRead(len + "_" + Math.abs(contentLength - nextReadPos)); + readFooterMetrics.getFileLength().set(contentLength); + } + } + + /** + * Updates metrics for the second read operation. + * + * @param readFooterMetrics The metrics object to update. + * @param nextReadPos The position of the next read operation. + * @param len The length of the read operation. + */ + private void updateMetricsOnSecondRead(AbfsReadFooterMetrics readFooterMetrics, long nextReadPos, int len) { + if (readFooterMetrics.getCollectMetricsForNextRead().get()) { + long offsetDiff = Math.abs(nextReadPos - readFooterMetrics.getOffsetOfFirstRead().get()); + readFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(len + "_" + offsetDiff); + readFooterMetrics.getCollectLenMetrics().set(true); + } + } + + + /** + * Check if the given metrics should be marked as a Parquet file. + * + * @param metrics The metrics to evaluate. + * @return True if the metrics meet the criteria for being marked as a Parquet file, false otherwise. + */ + private boolean shouldMarkAsParquet(AbfsReadFooterMetrics metrics) { + return metrics.getCollectMetrics().get() + && metrics.getReadCount().get() >= 2 + && !metrics.getIsParquetEvaluated().get() + && haveEqualValues(metrics.getSizeReadByFirstRead()) + && haveEqualValues(metrics.getOffsetDiffBetweenFirstAndSecondRead()); + } + + /** + * Check if two values are equal, considering they are in the format "value1_value2". + * + * @param value The value to check. + * @return True if the two parts of the value are equal, false otherwise. + */ + private boolean haveEqualValues(String value) { + String[] parts = value.split("_"); + return parts.length == 2 && parts[0].equals(parts[1]); + } + + /** + * Mark the given metrics as a Parquet file and update related values. + * + * @param metrics The metrics to mark as Parquet. + */ + private void markAsParquet(AbfsReadFooterMetrics metrics) { + metrics.getIsParquetFile().set(true); + String[] parts = metrics.getSizeReadByFirstRead().split("_"); + metrics.setSizeReadByFirstRead(parts[0]); + parts = metrics.getOffsetDiffBetweenFirstAndSecondRead().split("_"); + metrics.setOffsetDiffBetweenFirstAndSecondRead(parts[0]); + metrics.getIsParquetEvaluated().set(true); + } + + /** + * Check each metric in the provided map and mark them as Parquet files if they meet the criteria. + * + * @param metricsMap The map containing metrics to evaluate. + */ + public void checkIsParquet(Map<String, AbfsReadFooterMetrics> metricsMap) { + for (Map.Entry<String, AbfsReadFooterMetrics> entry : metricsMap.entrySet()) { + AbfsReadFooterMetrics readFooterMetrics = entry.getValue(); + if (shouldMarkAsParquet(readFooterMetrics)) { + markAsParquet(readFooterMetrics); + metricsMap.replace(entry.getKey(), readFooterMetrics); + } + } + } + + /** + * Updates the average read length requested for metrics of all files in the metrics map. + * If the metrics indicate that the update is needed, it calculates the average read length and updates the metrics. + * + * @param metricsMap A map containing metrics for different files with unique identifiers. + */ + private void updateLenRequested(Map<String, AbfsReadFooterMetrics> metricsMap) { + for (Map.Entry<String, AbfsReadFooterMetrics> entry : metricsMap.entrySet()) { + AbfsReadFooterMetrics readFooterMetrics = entry.getValue(); + if (readFooterMetrics.getCollectMetrics().get() && readFooterMetrics.getReadCount().get() > 2) { + if (!readFooterMetrics.getIsLenUpdated().get()) { + int readReqCount = readFooterMetrics.getReadCount().get() - 2; + readFooterMetrics.setAvgReadLenRequested( + (double) readFooterMetrics.getDataLenRequested().get() + / readReqCount); + } + readFooterMetrics.getIsLenUpdated().set(true); + metricsMap.replace(entry.getKey(), readFooterMetrics); + } + } + } + + /** + * Calculates the average metrics from a list of AbfsReadFooterMetrics and sets the values in the provided 'avgParquetReadFooterMetrics' object. + * + * @param isParquetList The list of AbfsReadFooterMetrics to compute the averages from. + * @param avgParquetReadFooterMetrics The target AbfsReadFooterMetrics object to store the computed average values. + * + * This method calculates various average metrics from the provided list and sets them in the 'avgParquetReadFooterMetrics' object. + * The metrics include: + * - Size read by the first read + * - Offset difference between the first and second read + * - Average file length + * - Average requested read length + */ + private void getParquetReadFooterMetricsAverage(List<AbfsReadFooterMetrics> isParquetList, + AbfsReadFooterMetrics avgParquetReadFooterMetrics){ + avgParquetReadFooterMetrics.setSizeReadByFirstRead( + String.format("%.3f", isParquetList.stream() + .map(AbfsReadFooterMetrics::getSizeReadByFirstRead).mapToDouble( + Double::parseDouble).average().orElse(0.0))); + avgParquetReadFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead( + String.format("%.3f", isParquetList.stream() + .map(AbfsReadFooterMetrics::getOffsetDiffBetweenFirstAndSecondRead) + .mapToDouble(Double::parseDouble).average().orElse(0.0))); + avgParquetReadFooterMetrics.setAvgFileLength(isParquetList.stream() + .map(AbfsReadFooterMetrics::getFileLength) + .mapToDouble(AtomicLong::get).average().orElse(0.0)); + avgParquetReadFooterMetrics.setAvgReadLenRequested(isParquetList.stream(). + map(AbfsReadFooterMetrics::getAvgReadLenRequested). + mapToDouble(Double::doubleValue).average().orElse(0.0)); + } + + private void getNonParquetReadFooterMetricsAverage(List<AbfsReadFooterMetrics> isNonParquetList, AbfsReadFooterMetrics avgNonParquetReadFooterMetrics) { + int size = isNonParquetList.get(0).getSizeReadByFirstRead().split("_").length; + double[] store = new double[2 * size]; + // Calculating sum of individual values + isNonParquetList.forEach(abfsReadFooterMetrics -> { + String[] firstReadSize = abfsReadFooterMetrics.getSizeReadByFirstRead().split("_"); + String[] offDiffFirstSecondRead = abfsReadFooterMetrics.getOffsetDiffBetweenFirstAndSecondRead().split("_"); + + for (int i = 0; i < firstReadSize.length; i++) { + store[i] += Long.parseLong(firstReadSize[i]); + store[i + size] += Long.parseLong(offDiffFirstSecondRead[i]); + } + }); + + // Calculating averages and creating formatted strings + StringJoiner firstReadSize = new StringJoiner("_"); + StringJoiner offDiffFirstSecondRead = new StringJoiner("_"); + + for (int j = 0; j < size; j++) { + firstReadSize.add(String.format("%.3f", store[j] / isNonParquetList.size())); + offDiffFirstSecondRead.add(String.format("%.3f", store[j + size] / isNonParquetList.size())); + } + + avgNonParquetReadFooterMetrics.setSizeReadByFirstRead(firstReadSize.toString()); + avgNonParquetReadFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(offDiffFirstSecondRead.toString()); + avgNonParquetReadFooterMetrics.setAvgFileLength(isNonParquetList.stream() + .mapToDouble(metrics -> metrics.getFileLength().doubleValue()).average().orElse(0.0)); + avgNonParquetReadFooterMetrics.setAvgReadLenRequested(isNonParquetList.stream() + .mapToDouble(AbfsReadFooterMetrics::getAvgReadLenRequested).average().orElse(0.0)); + } + + /* + Acronyms: + 1.FR :- First Read (In case of parquet we only maintain the size requested by application for + the first read, in case of non parquet we maintain a string separated by "_" delimiter where the first + substring represents the len requested for first read and the second substring represents the seek pointer difference from the + end of the file.) + 2.SR :- Second Read (In case of parquet we only maintain the size requested by application for + the second read, in case of non parquet we maintain a string separated by "_" delimiter where the first + substring represents the len requested for second read and the second substring represents the seek pointer difference from the + offset of the first read.) + 3.FL :- Total length of the file requested for read + */ + private String getReadFooterMetrics(AbfsReadFooterMetrics avgReadFooterMetrics) { + String readFooterMetric = ""; + if (avgReadFooterMetrics.getIsParquetFile().get()) { + readFooterMetric += "$Parquet:"; + } else { + readFooterMetric += "$NonParquet:"; + } + readFooterMetric += "$FR=" + avgReadFooterMetrics.getSizeReadByFirstRead() + + "$SR=" + + avgReadFooterMetrics.getOffsetDiffBetweenFirstAndSecondRead() + + "$FL=" + String.format("%.3f", + avgReadFooterMetrics.getAvgFileLength()) + + "$RL=" + String.format("%.3f", + avgReadFooterMetrics.getAvgReadLenRequested()); + return readFooterMetric; + } + + private String getFooterMetrics(List<AbfsReadFooterMetrics> readFooterMetricsList, String readFooterMetric){ + List<AbfsReadFooterMetrics> isParquetList = new ArrayList<>(); + List<AbfsReadFooterMetrics> isNonParquetList = new ArrayList<>(); + for (AbfsReadFooterMetrics abfsReadFooterMetrics : readFooterMetricsList) { + if (abfsReadFooterMetrics.getIsParquetFile().get()) { + isParquetList.add(abfsReadFooterMetrics); + } else { + isNonParquetList.add(abfsReadFooterMetrics); + } + } + AbfsReadFooterMetrics avgParquetReadFooterMetrics = new AbfsReadFooterMetrics(); + AbfsReadFooterMetrics avgNonparquetReadFooterMetrics = new AbfsReadFooterMetrics(); + + if (!isParquetList.isEmpty()){ + avgParquetReadFooterMetrics.getIsParquetFile().set(true); + getParquetReadFooterMetricsAverage(isParquetList, avgParquetReadFooterMetrics); + readFooterMetric += getReadFooterMetrics(avgParquetReadFooterMetrics); + } + if (!isNonParquetList.isEmpty()) { + avgNonparquetReadFooterMetrics.getIsParquetFile().set(false); + getNonParquetReadFooterMetricsAverage(isNonParquetList, avgNonparquetReadFooterMetrics); + readFooterMetric += getReadFooterMetrics(avgNonparquetReadFooterMetrics); + } + return readFooterMetric + ""; + } + + @Override + public String toString() { + Map<String, AbfsReadFooterMetrics> metricsMap = getMetricsMap(); + List<AbfsReadFooterMetrics> readFooterMetricsList = new ArrayList<>(); + if (metricsMap != null && !(metricsMap.isEmpty())) { + checkIsParquet(metricsMap); + updateLenRequested(metricsMap); + for (Map.Entry<String, AbfsReadFooterMetrics> entry : metricsMap.entrySet()) //using map.entrySet() for iteration Review Comment: `entrySet` may not give entries in same order always. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org