ptlrs commented on code in PR #6613:
URL: https://github.com/apache/ozone/pull/6613#discussion_r1870899158


##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java:
##########
@@ -618,6 +624,69 @@ private void decreasePendingMetricsAndReleaseSemaphore() {
     return new XceiverClientReply(replyFuture);
   }
 
+  public XceiverClientReply sendCommandReadBlock(
+      ContainerCommandRequestProto request, DatanodeDetails dn)
+      throws IOException, InterruptedException {
+
+    CompletableFuture<ContainerCommandResponseProto> future =
+        new CompletableFuture<>();
+    ContainerCommandResponseProto.Builder response =
+        ContainerCommandResponseProto.newBuilder();
+    ContainerProtos.ReadBlockResponseProto.Builder readBlock =
+        ContainerProtos.ReadBlockResponseProto.newBuilder();
+    checkOpen(dn);
+    UUID dnID = dn.getUuid();
+    Type cmdType = request.getCmdType();
+    semaphore.acquire();
+    long requestTime = System.currentTimeMillis();
+    metrics.incrPendingContainerOpsMetrics(cmdType);
+
+    final StreamObserver<ContainerCommandRequestProto> requestObserver =
+        asyncStubs.get(dnID).withDeadlineAfter(timeout, TimeUnit.SECONDS)
+            .send(new StreamObserver<ContainerCommandResponseProto>() {
+              @Override
+              public void onNext(
+                  ContainerCommandResponseProto responseProto) {
+                if (responseProto.getResult() == Result.SUCCESS) {
+                  readBlock.addReadChunk(responseProto.getReadChunk());
+                } else {
+                  future.complete(
+                      ContainerCommandResponseProto.newBuilder(responseProto)
+                          .setCmdType(Type.ReadBlock).build());
+                }
+              }
+
+              @Override
+              public void onError(Throwable t) {
+                future.completeExceptionally(t);
+                metrics.decrPendingContainerOpsMetrics(cmdType);
+                metrics.addContainerOpsLatency(
+                    cmdType, System.currentTimeMillis() - requestTime);

Review Comment:
   Elapsed time should be calculated using 
[nanoTime](https://docs.oracle.com/javase/8/docs/api/java/lang/System.html#nanoTime--)
 or `Time.monotonicNow()` as used in `HddsDispatcher.java`.



##########
hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import java.io.EOFException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+import com.google.common.primitives.Bytes;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ContainerBlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.common.Checksum;
+
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link TestStreamBlockInputStream}'s functionality.
+ */
+public class TestStreamBlockInputStream {
+  private int blockSize;
+  private static final int CHUNK_SIZE = 100;
+  private static final int BYTES_PER_CHECKSUM = 20;

Review Comment:
   Can we add some tests which cover the conditions where `BYTES_PER_CHECKSUM` 
is equal-to and greater than `CHUNK_SIZE`?



##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java:
##########
@@ -0,0 +1,760 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanUnbuffer;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadBlockResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumData;
+import org.apache.hadoop.ozone.common.OzoneChecksumException;
+import org.apache.hadoop.ozone.common.utils.BufferUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.io.grpc.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import static org.apache.hadoop.hdds.client.ReplicationConfig.getLegacyFactor;
+
+/**
+ * An {@link java.io.InputStream} called from KeyInputStream to read a block 
from the
+ * container.
+ */
+public class StreamBlockInputStream extends BlockExtendedInputStream
+    implements Seekable, CanUnbuffer, ByteBufferReadable {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StreamBlockInputStream.class);
+  private final BlockID blockID;
+  private final long blockLength;
+  private final AtomicReference<Pipeline> pipelineRef =
+      new AtomicReference<>();
+  private final AtomicReference<Token<OzoneBlockTokenIdentifier>> tokenRef =
+      new AtomicReference<>();
+  private XceiverClientFactory xceiverClientFactory;
+  private XceiverClientSpi xceiverClient;
+
+  private List<Long> bufferOffsets;
+  private int bufferIndex;
+  private long blockPosition = -1;
+  private List<ByteBuffer> buffers;
+  // Checks if the StreamBlockInputStream has already read data from the 
container.
+  private boolean allocated = false;
+  private long bufferOffsetWrtBlockData;
+  private long buffersSize;
+  private static final int EOF = -1;
+  private final List<XceiverClientSpi.Validator> validators;
+  private final boolean verifyChecksum;
+  private final Function<BlockID, BlockLocationInfo> refreshFunction;
+  private final RetryPolicy retryPolicy;
+  private int retries;
+
+
+  public StreamBlockInputStream(
+      BlockID blockID, long length, Pipeline pipeline,
+      Token<OzoneBlockTokenIdentifier> token,
+      XceiverClientFactory xceiverClientFactory,
+      Function<BlockID, BlockLocationInfo> refreshFunction,
+      OzoneClientConfig config) throws IOException {
+    this.blockID = blockID;
+    LOG.debug("Initializing StreamBlockInputStream for block {}", blockID);
+    this.blockLength = length;
+    setPipeline(pipeline);
+    tokenRef.set(token);
+    this.xceiverClientFactory = xceiverClientFactory;
+    this.validators = ContainerProtocolCalls.toValidatorList(
+        (request, response) -> validateBlock(response));
+    this.verifyChecksum = config.isChecksumVerify();
+    this.refreshFunction = refreshFunction;
+    this.retryPolicy =
+        HddsClientUtils.createRetryPolicy(config.getMaxReadRetryCount(),
+            TimeUnit.SECONDS.toMillis(config.getReadRetryInterval()));
+
+  }
+
+
+  public BlockID getBlockID() {
+    return blockID;
+  }
+
+  public long getLength() {
+    return blockLength;
+  }
+
+  @Override
+  public synchronized long getPos() {
+    if (blockLength == 0) {
+      return 0;
+    }
+    if (blockPosition >= 0) {
+      return blockPosition;
+    }
+
+    if (buffersHaveData()) {
+      // BufferOffset w.r.t to BlockData + BufferOffset w.r.t buffers +
+      // Position of current Buffer
+      return bufferOffsetWrtBlockData + bufferOffsets.get(bufferIndex) +
+          buffers.get(bufferIndex).position();
+    }
+    if (allocated && !dataRemainingInBlock()) {
+      Preconditions.checkState(
+          bufferOffsetWrtBlockData + buffersSize == blockLength,
+          "EOF detected but not at the last byte of the chunk");
+      return blockLength;
+    }
+    if (buffersAllocated()) {
+      return bufferOffsetWrtBlockData + buffersSize;
+    }
+    return 0;
+  }
+
+  @Override
+  public synchronized int read() throws IOException {
+    int dataout = EOF;
+    int len = 1;
+    int available;
+    while (len > 0) {
+      try {
+        acquireClient();
+        available = prepareRead(1);
+        retries = 0;
+      } catch (SCMSecurityException ex) {
+        throw ex;
+      } catch (StorageContainerException e) {
+        handleStorageContainerException(e);
+        continue;
+      } catch (IOException ioe) {
+        handleIOException(ioe);
+        continue;
+      }
+      if (available == EOF) {
+        // There is no more data in the chunk stream. The buffers should have
+        // been released by now
+        Preconditions.checkState(buffers == null);
+      } else {
+        dataout = Byte.toUnsignedInt(buffers.get(bufferIndex).get());
+      }
+
+      len -= available;
+      if (bufferEOF()) {
+        releaseBuffers(bufferIndex);
+      }
+    }
+
+
+    return dataout;
+
+
+  }
+
+  @Override
+  public synchronized int read(byte[] b, int off, int len) throws IOException {
+    // According to the JavaDocs for InputStream, it is recommended that
+    // subclasses provide an override of bulk read if possible for performance
+    // reasons.  In addition to performance, we need to do it for correctness
+    // reasons.  The Ozone REST service uses PipedInputStream and
+    // PipedOutputStream to relay HTTP response data between a Jersey thread 
and
+    // a Netty thread.  It turns out that PipedInputStream/PipedOutputStream
+    // have a subtle dependency (bug?) on the wrapped stream providing separate
+    // implementations of single-byte read and bulk read.  Without this, get 
key
+    // responses might close the connection before writing all of the bytes
+    // advertised in the Content-Length.
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (len == 0) {
+      return 0;
+    }
+    int total = 0;
+    int available;
+    while (len > 0) {
+      try {
+        acquireClient();
+        available = prepareRead(len);
+        retries = 0;
+      } catch (SCMSecurityException ex) {
+        throw ex;
+      } catch (StorageContainerException e) {
+        handleStorageContainerException(e);
+        continue;
+      } catch (IOException ioe) {
+        handleIOException(ioe);
+        continue;
+      }
+      if (available == EOF) {
+        // There is no more data in the block stream. The buffers should have
+        // been released by now
+        Preconditions.checkState(buffers == null);
+        return total != 0 ? total : EOF;
+      }
+      buffers.get(bufferIndex).get(b, off + total, available);
+      len -= available;
+      total += available;
+
+      if (bufferEOF()) {
+        releaseBuffers(bufferIndex);
+      }
+    }
+    return total;
+
+  }
+
+  @Override
+  public synchronized int read(ByteBuffer byteBuffer) throws IOException {
+    if (byteBuffer == null) {
+      throw new NullPointerException();
+    }
+    int len = byteBuffer.remaining();
+    if (len == 0) {
+      return 0;
+    }
+    int total = 0;
+    int available;
+    while (len > 0) {
+      try {
+        acquireClient();
+        available = prepareRead(len);
+        retries = 0;
+      } catch (SCMSecurityException ex) {
+        throw ex;
+      } catch (StorageContainerException e) {
+        handleStorageContainerException(e);
+        continue;
+      } catch (IOException ioe) {
+        handleIOException(ioe);
+        continue;
+      }
+      if (available == EOF) {
+        // There is no more data in the block stream. The buffers should have
+        // been released by now
+        Preconditions.checkState(buffers == null);
+        return total != 0 ? total : EOF;
+      }
+      ByteBuffer readBuf = buffers.get(bufferIndex);
+      ByteBuffer tmpBuf = readBuf.duplicate();
+      tmpBuf.limit(tmpBuf.position() + available);
+      byteBuffer.put(tmpBuf);
+      readBuf.position(tmpBuf.position());
+
+      len -= available;
+      total += available;
+
+      if (bufferEOF()) {
+        releaseBuffers(bufferIndex);
+      }
+    }
+    return total;
+  }
+
+  @Override
+  protected int readWithStrategy(ByteReaderStrategy strategy) throws 
IOException {
+    throw new NotImplementedException("readWithStrategy is not implemented.");
+  }
+
+  @Override
+  public synchronized void seek(long pos) throws IOException {
+    if (pos == 0 && blockLength == 0) {
+      // It is possible for length and pos to be zero in which case
+      // seek should return instead of throwing exception
+      return;
+    }
+    if (pos < 0 || pos > blockLength) {
+      throw new EOFException("EOF encountered at pos: " + pos + " for block: " 
+ blockID);
+    }
+
+    if (buffersHavePosition(pos)) {
+      // The bufferPosition is w.r.t the current block.
+      // Adjust the bufferIndex and position to the seeked position.
+      adjustBufferPosition(pos - bufferOffsetWrtBlockData);
+    } else {
+      blockPosition = pos;
+    }
+  }
+
+  @Override
+  public synchronized boolean seekToNewSource(long l) throws IOException {
+    return false;
+  }
+
+  @Override
+  public synchronized void unbuffer() {
+    blockPosition = getPos();
+    releaseClient();
+    releaseBuffers();
+  }
+
+  private void setPipeline(Pipeline pipeline) throws IOException {
+    if (pipeline == null) {
+      return;
+    }
+    long replicaIndexes = 
pipeline.getNodes().stream().mapToInt(pipeline::getReplicaIndex).distinct().count();
+
+    if (replicaIndexes > 1) {
+      throw new IOException(String.format("Pipeline: %s has nodes containing 
different replica indexes.",
+          pipeline));
+    }
+
+    // irrespective of the container state, we will always read via Standalone
+    // protocol.
+    boolean okForRead =
+        pipeline.getType() == HddsProtos.ReplicationType.STAND_ALONE
+            || pipeline.getType() == HddsProtos.ReplicationType.EC;
+    Pipeline readPipeline = okForRead ? pipeline : 
Pipeline.newBuilder(pipeline)
+        .setReplicationConfig(StandaloneReplicationConfig.getInstance(
+            getLegacyFactor(pipeline.getReplicationConfig())))
+        .build();
+    pipelineRef.set(readPipeline);
+  }
+
+  protected synchronized void checkOpen() throws IOException {
+    if (xceiverClientFactory == null) {
+      throw new IOException("BlockInputStream has been closed.");
+    }
+  }
+
+  protected synchronized void acquireClient() throws IOException {
+    checkOpen();
+    if (xceiverClient == null) {
+      final Pipeline pipeline = pipelineRef.get();
+      try {
+        xceiverClient = 
xceiverClientFactory.acquireClientForReadData(pipeline);
+      } catch (IOException ioe) {
+        LOG.warn("Failed to acquire client for pipeline {}, block {}",
+            pipeline, blockID);
+        throw ioe;
+      }
+    }
+  }
+
+  private synchronized int prepareRead(int len) throws IOException {
+    for (;;) {
+      if (blockPosition >= 0) {
+        if (buffersHavePosition(blockPosition)) {
+          // The current buffers have the seeked position. Adjust the buffer
+          // index and position to point to the buffer position.
+          adjustBufferPosition(blockPosition - bufferOffsetWrtBlockData);
+        } else {
+          // Read a required block data to fill the buffers with seeked
+          // position data
+          readDataFromContainer(len);
+        }
+      }
+      if (buffersHaveData()) {
+        // Data is available from buffers
+        ByteBuffer bb = buffers.get(bufferIndex);
+        return Math.min(len, bb.remaining());
+      } else if (dataRemainingInBlock()) {
+        // There is more data in the block stream which has not
+        // been read into the buffers yet.
+        readDataFromContainer(len);
+      } else {
+        // All available input from this block stream has been consumed.
+        return EOF;
+      }
+    }
+
+
+  }
+
+  private boolean buffersHavePosition(long pos) {
+    // Check if buffers have been allocated
+    if (buffersAllocated()) {
+      // Check if the current buffers cover the input position
+      // Released buffers should not be considered when checking if position
+      // is available
+      return pos >= bufferOffsetWrtBlockData +
+          bufferOffsets.get(0) &&
+          pos < bufferOffsetWrtBlockData + buffersSize;
+    }
+    return false;
+  }
+
+  /**
+   * Check if the buffers have been allocated data and false otherwise.
+   */
+  @VisibleForTesting
+  protected boolean buffersAllocated() {
+    return buffers != null && !buffers.isEmpty();
+  }
+
+  /**
+   * Adjust the buffers position to account for seeked position and/ or 
checksum
+   * boundary reads.
+   * @param bufferPosition the position to which the buffers must be advanced
+   */
+  private void adjustBufferPosition(long bufferPosition) {
+    // The bufferPosition is w.r.t the current buffers.
+    // Adjust the bufferIndex and position to the seeked bufferPosition.
+    if (bufferIndex >= buffers.size()) {
+      bufferIndex = Collections.binarySearch(bufferOffsets, bufferPosition);
+    } else if (bufferPosition < bufferOffsets.get(bufferIndex)) {
+      bufferIndex = Collections.binarySearch(
+          bufferOffsets.subList(0, bufferIndex), bufferPosition);
+    } else if (bufferPosition >= bufferOffsets.get(bufferIndex) +
+        buffers.get(bufferIndex).capacity()) {
+      bufferIndex = Collections.binarySearch(bufferOffsets.subList(
+          bufferIndex + 1, buffers.size()), bufferPosition);
+    }

Review Comment:
   Can the conditions be combined to run a binary search on the entire 
bufferOffsets to simplify the code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to