steveloughran commented on a change in pull request #2076: URL: https://github.com/apache/hadoop/pull/2076#discussion_r440792327
########## File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java ########## @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Interface for statistics for the AbfsInputStream. + */ +@InterfaceStability.Unstable +public interface AbfsInputStreamStatistics { + /** + * Seek backwards, incrementing the seek and backward seek counters. + * + * @param negativeOffset how far was the seek? + * This is expected to be negative. + */ + void seekBackwards(long negativeOffset); + + /** + * Record a forward seek, adding a seek operation, a forward + * seek operation, and any bytes skipped. + * + * @param skipped number of bytes skipped by reading from the stream. + * If the seek was implemented by a close + reopen, set this to zero. + */ + void seekForwards(long skipped); + + /** + * Record a forward or backward seek, adding a seek operation, a forward or + * a backward seek operation, and number of bytes skipped. + * + * @param seekTo seek to the position. + * @param currentPos current position. + */ + void seek(long seekTo, long currentPos); + + /** + * Increment the bytes read counter by the number of bytes; + * no-op if the argument is negative. + * + * @param bytes number of bytes read. + */ + void bytesRead(long bytes); + + /** + * Record the total bytes read from buffer. + * + * @param bytes number of bytes that are read from buffer. + */ + void bytesReadFromBuffer(long bytes); + + /** + * Records the total number of seeks done in the buffer. + */ + void seekInBuffer(); + + /** + * A {@code read(byte[] buf, int off, int len)} operation has started. + * + * @param pos starting position of the read. + * @param len length of bytes to read. + */ + void readOperationStarted(long pos, long len); + + /** + * Records a successful remote read operation. + */ + void remoteReadOperation(); + + /** + * Makes the string of all the AbfsInputStream statistics. + * @return the string with all the statistics. + */ + @Override + String toString(); Review comment: not needed; remove ########## File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java ########## @@ -443,4 +488,26 @@ protected void setCachedSasToken(final CachedSASToken cachedSasToken) { this.cachedSasToken = cachedSasToken; } + /** + * Getter for AbfsInputStreamStatistics. + * + * @return an instance of AbfsInputStreamStatistics. + */ + @VisibleForTesting + public AbfsInputStreamStatistics getStreamStatistics() { + return streamStatistics; + } + + /** + * Get the statistics of the stream. + * @return a string value. + */ + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(super.toString()); + sb.append("AbfsInputStream@(").append(this.hashCode()).append("){"); + sb.append(streamStatistics.toString()); Review comment: needs check for stats == null ########## File path: hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java ########## @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; +import org.apache.hadoop.io.IOUtils; + +public class ITestAbfsInputStreamStatistics + extends AbstractAbfsIntegrationTest { + private static final int OPERATIONS = 10; + private static final Logger LOG = + LoggerFactory.getLogger(ITestAbfsInputStreamStatistics.class); + private static final int ONE_MB = 1024 * 1024; + private byte[] defBuffer = new byte[ONE_MB]; + + public ITestAbfsInputStreamStatistics() throws Exception { + } + + /** + * Test to check the initial values of the AbfsInputStream statistics. + */ + @Test + public void testInitValues() throws IOException { + describe("Testing the initial values of AbfsInputStream Statistics"); + + AzureBlobFileSystem fs = getFileSystem(); + AzureBlobFileSystemStore abfss = fs.getAbfsStore(); + Path initValuesPath = path(getMethodName()); + AbfsOutputStream outputStream = null; + AbfsInputStream inputStream = null; + + try { + + outputStream = createAbfsOutputStreamWithFlushEnabled(fs, initValuesPath); + inputStream = abfss.openFileForRead(initValuesPath, fs.getFsStatistics()); + + AbfsInputStreamStatisticsImpl stats = + (AbfsInputStreamStatisticsImpl) inputStream.getStreamStatistics(); + + checkInitValue(stats.getSeekOperations(), "seekOps"); + checkInitValue(stats.getForwardSeekOperations(), "forwardSeekOps"); + checkInitValue(stats.getBackwardSeekOperations(), "backwardSeekOps"); + checkInitValue(stats.getBytesRead(), "bytesRead"); + checkInitValue(stats.getBytesSkippedOnSeek(), "bytesSkippedOnSeek"); + checkInitValue(stats.getBytesBackwardsOnSeek(), "bytesBackwardsOnSeek"); + checkInitValue(stats.getSeekInBuffer(), "seekInBuffer"); + checkInitValue(stats.getReadOperations(), "readOps"); + checkInitValue(stats.getBytesReadFromBuffer(), "bytesReadFromBuffer"); + checkInitValue(stats.getRemoteReadOperations(), "remoteReadOps"); + + } finally { + IOUtils.cleanupWithLogger(LOG, outputStream, inputStream); + } + } + + /** + * Test to check statistics from seek operation in AbfsInputStream. + */ + @Test + public void testSeekStatistics() throws IOException { + describe("Testing the values of statistics from seek operations in " + + "AbfsInputStream"); + + AzureBlobFileSystem fs = getFileSystem(); + AzureBlobFileSystemStore abfss = fs.getAbfsStore(); + Path seekStatPath = path(getMethodName()); + + AbfsOutputStream out = null; + AbfsInputStream in = null; + + try { + out = createAbfsOutputStreamWithFlushEnabled(fs, seekStatPath); + + //Writing a default buffer in a file. + out.write(defBuffer); + out.hflush(); + in = abfss.openFileForRead(seekStatPath, fs.getFsStatistics()); + + /* + * Writing 1MB buffer to the file, this would make the fCursor(Current + * position of cursor) to the end of file. + */ + int result = in.read(defBuffer, 0, ONE_MB); + LOG.info("Result of read : {}", result); + + /* + * Seeking to start of file and then back to end would result in a + * backward and a forward seek respectively 10 times. + */ + for (int i = 0; i < OPERATIONS; i++) { + in.seek(0); + in.seek(ONE_MB); + } + + AbfsInputStreamStatisticsImpl stats = + (AbfsInputStreamStatisticsImpl) in.getStreamStatistics(); + /* + * seekOps - Since we are doing backward and forward seek OPERATIONS + * times, total seeks would be 2 * OPERATIONS. + * + * backwardSeekOps - Since we are doing a backward seek inside a loop + * for OPERATION times, total backward seeks would be OPERATIONS. + * + * forwardSeekOps - Since we are doing a forward seek inside a loop + * for OPERATION times, total forward seeks would be OPERATIONS. + * + * bytesBackwardsOnSeek - Since we are doing backward seeks from end of + * file in a ONE_MB file each time, this would mean the bytes from + * backward seek would be OPERATIONS * ONE_MB. Since this is backward + * seek this value is expected be to be negative. + * + * bytesSkippedOnSeek - Since, we move from start to end in seek, but + * our fCursor(position of cursor) always remain at end of file, this + * would mean no bytes were skipped on seek. Since, all forward seeks + * are in buffer. + * + * seekInBuffer - Since all seeks were in buffer, the seekInBuffer + * would be equal to 2 * OPERATIONS. + * + */ + assertEquals("Mismatch in seekOps value", 2 * OPERATIONS, + stats.getSeekOperations()); + assertEquals("Mismatch in backwardSeekOps value", OPERATIONS, + stats.getBackwardSeekOperations()); + assertEquals("Mismatch in forwardSeekOps value", OPERATIONS, + stats.getForwardSeekOperations()); + assertEquals("Mismatch in bytesBackwardsOnSeek value", + -1 * OPERATIONS * ONE_MB, stats.getBytesBackwardsOnSeek()); + assertEquals("Mismatch in bytesSkippedOnSeek value", + 0, stats.getBytesSkippedOnSeek()); + assertEquals("Mismatch in seekInBuffer value", 2 * OPERATIONS, + stats.getSeekInBuffer()); + + } finally { + IOUtils.cleanupWithLogger(LOG, out, in); + } + } + + /** + * Test to check statistics value from read operation in AbfsInputStream. + */ + @Test + public void testReadStatistics() throws IOException { + describe("Testing the values of statistics from read operation in " + + "AbfsInputStream"); + + AzureBlobFileSystem fs = getFileSystem(); + AzureBlobFileSystemStore abfss = fs.getAbfsStore(); + Path readStatPath = path(getMethodName()); + + AbfsOutputStream out = null; + AbfsInputStream in = null; + + try { + out = createAbfsOutputStreamWithFlushEnabled(fs, readStatPath); + + /* + * Writing 1MB buffer to the file. + */ + out.write(defBuffer); + out.hflush(); + in = abfss.openFileForRead(readStatPath, fs.getFsStatistics()); + + /* + * Doing file read 10 times. + */ + for (int i = 0; i < OPERATIONS; i++) { + in.read(); + } + + AbfsInputStreamStatisticsImpl stats = + (AbfsInputStreamStatisticsImpl) in.getStreamStatistics(); + + /* + * bytesRead - Since each time a single byte is read, total + * bytes read would be equal to OPERATIONS. + * + * readOps - Since each time read operation is performed OPERATIONS + * times, total number of read operations would be equal to OPERATIONS. + * + * remoteReadOps - Only a single remote read operation is done. Hence, + * total remote read ops is 1. + * + */ + assertEquals("Mismatch in bytesRead value", OPERATIONS, + stats.getBytesRead()); + assertEquals("Mismatch in readOps value", OPERATIONS, + stats.getReadOperations()); + assertEquals("Mismatch in remoteReadOps value", 1, + stats.getRemoteReadOperations()); + Review comment: 1. close() the stream and ask for the stats again, to verify they are still readable 2. call toString on opened and closed streams. ########## File path: hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java ########## @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; +import org.apache.hadoop.io.IOUtils; + +public class ITestAbfsInputStreamStatistics + extends AbstractAbfsIntegrationTest { + private static final int OPERATIONS = 10; + private static final Logger LOG = + LoggerFactory.getLogger(ITestAbfsInputStreamStatistics.class); + private static final int ONE_MB = 1024 * 1024; + private byte[] defBuffer = new byte[ONE_MB]; + + public ITestAbfsInputStreamStatistics() throws Exception { + } + + /** + * Test to check the initial values of the AbfsInputStream statistics. + */ + @Test + public void testInitValues() throws IOException { + describe("Testing the initial values of AbfsInputStream Statistics"); + + AzureBlobFileSystem fs = getFileSystem(); + AzureBlobFileSystemStore abfss = fs.getAbfsStore(); + Path initValuesPath = path(getMethodName()); + AbfsOutputStream outputStream = null; + AbfsInputStream inputStream = null; + + try { + + outputStream = createAbfsOutputStreamWithFlushEnabled(fs, initValuesPath); + inputStream = abfss.openFileForRead(initValuesPath, fs.getFsStatistics()); + + AbfsInputStreamStatisticsImpl stats = + (AbfsInputStreamStatisticsImpl) inputStream.getStreamStatistics(); + + checkInitValue(stats.getSeekOperations(), "seekOps"); + checkInitValue(stats.getForwardSeekOperations(), "forwardSeekOps"); + checkInitValue(stats.getBackwardSeekOperations(), "backwardSeekOps"); + checkInitValue(stats.getBytesRead(), "bytesRead"); + checkInitValue(stats.getBytesSkippedOnSeek(), "bytesSkippedOnSeek"); + checkInitValue(stats.getBytesBackwardsOnSeek(), "bytesBackwardsOnSeek"); + checkInitValue(stats.getSeekInBuffer(), "seekInBuffer"); + checkInitValue(stats.getReadOperations(), "readOps"); + checkInitValue(stats.getBytesReadFromBuffer(), "bytesReadFromBuffer"); + checkInitValue(stats.getRemoteReadOperations(), "remoteReadOps"); + + } finally { + IOUtils.cleanupWithLogger(LOG, outputStream, inputStream); + } + } + + /** + * Test to check statistics from seek operation in AbfsInputStream. + */ + @Test + public void testSeekStatistics() throws IOException { + describe("Testing the values of statistics from seek operations in " + + "AbfsInputStream"); + + AzureBlobFileSystem fs = getFileSystem(); + AzureBlobFileSystemStore abfss = fs.getAbfsStore(); + Path seekStatPath = path(getMethodName()); + + AbfsOutputStream out = null; + AbfsInputStream in = null; + + try { + out = createAbfsOutputStreamWithFlushEnabled(fs, seekStatPath); + + //Writing a default buffer in a file. + out.write(defBuffer); + out.hflush(); + in = abfss.openFileForRead(seekStatPath, fs.getFsStatistics()); + + /* + * Writing 1MB buffer to the file, this would make the fCursor(Current + * position of cursor) to the end of file. + */ + int result = in.read(defBuffer, 0, ONE_MB); + LOG.info("Result of read : {}", result); + + /* + * Seeking to start of file and then back to end would result in a + * backward and a forward seek respectively 10 times. + */ + for (int i = 0; i < OPERATIONS; i++) { + in.seek(0); + in.seek(ONE_MB); + } + + AbfsInputStreamStatisticsImpl stats = + (AbfsInputStreamStatisticsImpl) in.getStreamStatistics(); + /* + * seekOps - Since we are doing backward and forward seek OPERATIONS + * times, total seeks would be 2 * OPERATIONS. + * + * backwardSeekOps - Since we are doing a backward seek inside a loop + * for OPERATION times, total backward seeks would be OPERATIONS. + * + * forwardSeekOps - Since we are doing a forward seek inside a loop + * for OPERATION times, total forward seeks would be OPERATIONS. + * + * bytesBackwardsOnSeek - Since we are doing backward seeks from end of + * file in a ONE_MB file each time, this would mean the bytes from + * backward seek would be OPERATIONS * ONE_MB. Since this is backward + * seek this value is expected be to be negative. + * + * bytesSkippedOnSeek - Since, we move from start to end in seek, but + * our fCursor(position of cursor) always remain at end of file, this + * would mean no bytes were skipped on seek. Since, all forward seeks + * are in buffer. + * + * seekInBuffer - Since all seeks were in buffer, the seekInBuffer + * would be equal to 2 * OPERATIONS. + * + */ + assertEquals("Mismatch in seekOps value", 2 * OPERATIONS, + stats.getSeekOperations()); + assertEquals("Mismatch in backwardSeekOps value", OPERATIONS, + stats.getBackwardSeekOperations()); + assertEquals("Mismatch in forwardSeekOps value", OPERATIONS, + stats.getForwardSeekOperations()); + assertEquals("Mismatch in bytesBackwardsOnSeek value", + -1 * OPERATIONS * ONE_MB, stats.getBytesBackwardsOnSeek()); + assertEquals("Mismatch in bytesSkippedOnSeek value", + 0, stats.getBytesSkippedOnSeek()); + assertEquals("Mismatch in seekInBuffer value", 2 * OPERATIONS, + stats.getSeekInBuffer()); + Review comment: 1. close() the stream and ask for the stats again, to verify they are still readable 2. call toString on opened and closed streams. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org