steveloughran commented on a change in pull request #2353: URL: https://github.com/apache/hadoop/pull/2353#discussion_r504849729
########## File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java ########## @@ -279,7 +289,15 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti AbfsPerfTracker tracker = client.getAbfsPerfTracker(); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) { LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length); - op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get()); + if (ioStatistics != null) { Review comment: the methods in IOStatisticsBinding now all take a null DurationTrackerFactory, so you don't need the two branches here any more ########## File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java ########## @@ -73,7 +73,36 @@ READ_THROTTLES("read_throttles", "Total number of times a read operation is throttled."), WRITE_THROTTLES("write_throttles", - "Total number of times a write operation is throttled."); + "Total number of times a write operation is throttled."), + + //OutputStream statistics. + BYTES_TO_UPLOAD("bytes_upload", Review comment: these should all be in hadoop-common *where possible*, so that we have consistent names everywhere for aggregation ########## File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java ########## @@ -73,6 +78,8 @@ private long bytesFromReadAhead; // bytes read from readAhead; for testing private long bytesFromRemoteRead; // bytes read remotely; for testing + private IOStatistics ioStatistics; Review comment: final? ########## File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java ########## @@ -90,9 +107,7 @@ public void seek(long seekTo, long currentPos) { */ @Override public void bytesRead(long bytes) { - if (bytes > 0) { - bytesRead += bytes; - } + ioStatisticsStore.incrementCounter(STREAM_READ_BYTES, bytes); Review comment: One thing to consider here is the cost of the map lookup on every IOP. You can ask the IOStatisticsStore for a reference to the atomic counter, and use that direct. I'm doing that for the output stream, reviewing this patch makes me realise I should be doing it for read as well. at least the read byte counters which are incremented on every read. bytesUploaded = store.getCounterReference( STREAM_WRITE_TOTAL_DATA.getSymbol()); bytesWritten = store.getCounterReference( StreamStatisticNames.STREAM_WRITE_BYTES); ########## File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java ########## @@ -376,27 +384,31 @@ private synchronized void writeCurrentBufferToService() throws IOException { position += bytesLength; if (threadExecutor.getQueue().size() >= maxRequestsThatCanBeQueued) { - long start = System.currentTimeMillis(); - waitForTaskToComplete(); - outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis()); + //Tracking time spent on waiting for task to complete. + try (DurationTracker ignored = outputStreamStatistics.timeSpentTaskWait()) { + waitForTaskToComplete(); + } } - final Future<Void> job = completionService.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - AbfsPerfTracker tracker = client.getAbfsPerfTracker(); - try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, - "writeCurrentBufferToService", "append")) { - AbfsRestOperation op = client.append(path, offset, bytes, 0, - bytesLength, cachedSasToken.get(), false); - cachedSasToken.update(op.getSasToken()); - perfInfo.registerResult(op.getResult()); - byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); - perfInfo.registerSuccess(true); - return null; - } - } - }); + final Future<Void> job = + completionService.submit(IOStatisticsBinding + .trackDurationOfCallable((IOStatisticsStore) ioStatistics, + AbfsStatistic.TIME_SPENT_ON_PUT_REQUEST.getStatName(), + () -> { + AbfsPerfTracker tracker = client.getAbfsPerfTracker(); Review comment: given we are wrapping callables with callables, maybe the Abfs Perf tracker could join in. Not needed for this patch, but later... ########## File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java ########## @@ -160,16 +188,7 @@ public long getWriteCurrentBufferOperations() { @Override public String toString() { final StringBuilder outputStreamStats = new StringBuilder( "OutputStream Statistics{"); - outputStreamStats.append(", bytes_upload=").append(bytesToUpload); - outputStreamStats.append(", bytes_upload_successfully=") - .append(bytesUploadSuccessful); - outputStreamStats.append(", bytes_upload_failed=") - .append(bytesUploadFailed); - outputStreamStats.append(", time_spent_task_wait=") - .append(timeSpentOnTaskWait); - outputStreamStats.append(", queue_shrunk_ops=").append(queueShrunkOps); - outputStreamStats.append(", write_current_buffer_ops=") - .append(writeCurrentBufferOperations); + outputStreamStats.append(IOStatisticsLogging.ioStatisticsSourceToString(ioStatisticsStore)); Review comment: ioStatisticsStore.toString() does that too; what you get through the logging API is resilience to failures ########## File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java ########## @@ -161,55 +170,92 @@ public void remoteBytesRead(long bytes) { */ @Override public void remoteReadOperation() { - remoteReadOperations++; + ioStatisticsStore.incrementCounter(getStatName(REMOTE_READ_OP)); } + /** + * Getter for IOStatistics instance used. + * @return IOStatisticsStore instance which extends IOStatistics. + */ + @Override + public IOStatistics getIOStatistics() { + return ioStatisticsStore; + } + + @VisibleForTesting public long getSeekOperations() { - return seekOperations; + return ioStatisticsStore.counters().get(STREAM_READ_SEEK_OPERATIONS); } + @VisibleForTesting public long getForwardSeekOperations() { - return forwardSeekOperations; + return ioStatisticsStore.counters().get(STREAM_READ_SEEK_FORWARD_OPERATIONS); } + @VisibleForTesting public long getBackwardSeekOperations() { - return backwardSeekOperations; + return ioStatisticsStore.counters().get(STREAM_READ_SEEK_BACKWARD_OPERATIONS); } + @VisibleForTesting public long getBytesRead() { - return bytesRead; + return ioStatisticsStore.counters().get(STREAM_READ_BYTES); } + @VisibleForTesting public long getBytesSkippedOnSeek() { - return bytesSkippedOnSeek; + return ioStatisticsStore.counters().get(STREAM_READ_SEEK_BYTES_SKIPPED); } + @VisibleForTesting public long getBytesBackwardsOnSeek() { - return bytesBackwardsOnSeek; + return ioStatisticsStore.counters().get(STREAM_READ_SEEK_BYTES_BACKWARDS); } + @VisibleForTesting public long getSeekInBuffer() { - return seekInBuffer; + return ioStatisticsStore.counters().get(getStatName(SEEK_IN_BUFFER)); + } + @VisibleForTesting public long getReadOperations() { - return readOperations; + return ioStatisticsStore.counters().get(STREAM_READ_OPERATIONS); } + @VisibleForTesting public long getBytesReadFromBuffer() { - return bytesReadFromBuffer; + return ioStatisticsStore.counters().get(getStatName(BYTES_READ_BUFFER)); } + @VisibleForTesting public long getRemoteReadOperations() { - return remoteReadOperations; + return ioStatisticsStore.counters().get(getStatName(REMOTE_READ_OP)); } + @VisibleForTesting public long getReadAheadBytesRead() { - return readAheadBytesRead; + return ioStatisticsStore.counters().get(getStatName(READ_AHEAD_BYTES_READ)); } + @VisibleForTesting public long getRemoteBytesRead() { - return remoteBytesRead; + return ioStatisticsStore.counters().get(getStatName(REMOTE_BYTES_READ)); + } + + @VisibleForTesting + public double getActionHttpGetRequest() { Review comment: even for testing, add a comment. Here's actually the mean value, isn't it? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org