This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new c6724537d7 HDDS-10387. Fix parameter number warning in KeyOutputStream 
and related classes (#6225)
c6724537d7 is described below

commit c6724537d7413d5244bd3843fe7170d954d5d77e
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Feb 19 21:59:47 2024 -0800

    HDDS-10387. Fix parameter number warning in KeyOutputStream and related 
classes (#6225)
---
 .../ozone/client/io/BlockOutputStreamEntry.java    |  62 +++++-----
 .../client/io/BlockOutputStreamEntryPool.java      |  65 +++--------
 .../ozone/client/io/ECBlockOutputStreamEntry.java  | 101 +----------------
 .../client/io/ECBlockOutputStreamEntryPool.java    |  49 ++------
 .../hadoop/ozone/client/io/ECKeyOutputStream.java  | 125 ++++++---------------
 .../hadoop/ozone/client/io/KeyOutputStream.java    |  72 +++---------
 .../client/io/TestECBlockOutputStreamEntry.java    |  16 +--
 .../hadoop/ozone/client/OzoneOutputStreamStub.java |   7 +-
 8 files changed, 117 insertions(+), 380 deletions(-)

diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 9bdec27f53..c0221d07a5 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -37,6 +37,7 @@ import 
org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.util.JavaUtils;
 
 /**
  * A BlockOutputStreamEntry manages the data writes into the DataNodes.
@@ -60,33 +61,28 @@ public class BlockOutputStreamEntry extends OutputStream {
   private long currentPosition;
   private final Token<OzoneBlockTokenIdentifier> token;
 
-  private BufferPool bufferPool;
-  private ContainerClientMetrics clientMetrics;
-  private StreamBufferArgs streamBufferArgs;
-
-  @SuppressWarnings({"parameternumber", "squid:S00107"})
-  BlockOutputStreamEntry(
-      BlockID blockID, String key,
-      XceiverClientFactory xceiverClientManager,
-      Pipeline pipeline,
-      long length,
-      BufferPool bufferPool,
-      Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config,
-      ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
-  ) {
-    this.config = config;
+  private final BufferPool bufferPool;
+  private final ContainerClientMetrics clientMetrics;
+  private final StreamBufferArgs streamBufferArgs;
+
+  BlockOutputStreamEntry(Builder b) {
+    this.config = b.config;
     this.outputStream = null;
-    this.blockID = blockID;
-    this.key = key;
-    this.xceiverClientManager = xceiverClientManager;
-    this.pipeline = pipeline;
-    this.token = token;
-    this.length = length;
+    this.blockID = b.blockID;
+    this.key = b.key;
+    this.xceiverClientManager = b.xceiverClientManager;
+    this.pipeline = b.pipeline;
+    this.token = b.token;
+    this.length = b.length;
     this.currentPosition = 0;
-    this.bufferPool = bufferPool;
-    this.clientMetrics = clientMetrics;
-    this.streamBufferArgs = streamBufferArgs;
+    this.bufferPool = b.bufferPool;
+    this.clientMetrics = b.clientMetrics;
+    this.streamBufferArgs = b.streamBufferArgs;
+  }
+
+  @Override
+  public String toString() {
+    return JavaUtils.getClassSimpleName(getClass()) + ":" + key + " " + 
blockID;
   }
 
   /**
@@ -362,6 +358,14 @@ public class BlockOutputStreamEntry extends OutputStream {
     private ContainerClientMetrics clientMetrics;
     private StreamBufferArgs streamBufferArgs;
 
+    public Pipeline getPipeline() {
+      return pipeline;
+    }
+
+    public long getLength() {
+      return length;
+    }
+
     public Builder setBlockID(BlockID bID) {
       this.blockID = bID;
       return this;
@@ -412,13 +416,7 @@ public class BlockOutputStreamEntry extends OutputStream {
     }
 
     public BlockOutputStreamEntry build() {
-      return new BlockOutputStreamEntry(blockID,
-          key,
-          xceiverClientManager,
-          pipeline,
-          length,
-          bufferPool,
-          token, config, clientMetrics, streamBufferArgs);
+      return new BlockOutputStreamEntry(this);
     }
   }
 }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index d0f3b5728a..4d6026f925 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
 
-import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.scm.ByteStringConversion;
 import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
@@ -62,7 +61,7 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
   /**
    * List of stream entries that are used to write a block of data.
    */
-  private final List<BlockOutputStreamEntry> streamEntries;
+  private final List<BlockOutputStreamEntry> streamEntries = new ArrayList<>();
   private final OzoneClientConfig config;
   /**
    * The actual stream entry we are writing into. Note that a stream entry is
@@ -73,7 +72,6 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
   private final OzoneManagerProtocol omClient;
   private final OmKeyArgs keyArgs;
   private final XceiverClientFactory xceiverClientFactory;
-  private final String requestID;
   /**
    * A {@link BufferPool} shared between all
    * {@link org.apache.hadoop.hdds.scm.storage.BlockOutputStream}s managed by
@@ -86,39 +84,31 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
   private final ContainerClientMetrics clientMetrics;
   private final StreamBufferArgs streamBufferArgs;
 
-  @SuppressWarnings({"parameternumber", "squid:S00107"})
-  public BlockOutputStreamEntryPool(
-      OzoneClientConfig config,
-      OzoneManagerProtocol omClient,
-      String requestId, ReplicationConfig replicationConfig,
-      String uploadID, int partNumber,
-      boolean isMultipart, OmKeyInfo info,
-      boolean unsafeByteBufferConversion,
-      XceiverClientFactory xceiverClientFactory, long openID,
-      ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
-  ) {
-    this.config = config;
-    this.xceiverClientFactory = xceiverClientFactory;
-    streamEntries = new ArrayList<>();
+  public BlockOutputStreamEntryPool(KeyOutputStream.Builder b) {
+    this.config = b.getClientConfig();
+    this.xceiverClientFactory = b.getXceiverManager();
     currentStreamIndex = 0;
-    this.omClient = omClient;
+    this.omClient = b.getOmClient();
+    final OmKeyInfo info = b.getOpenHandler().getKeyInfo();
     this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName())
         .setBucketName(info.getBucketName()).setKeyName(info.getKeyName())
-        
.setReplicationConfig(replicationConfig).setDataSize(info.getDataSize())
-        .setIsMultipartKey(isMultipart).setMultipartUploadID(uploadID)
-        .setMultipartUploadPartNumber(partNumber).build();
-    this.requestID = requestId;
-    this.openID = openID;
+        .setReplicationConfig(b.getReplicationConfig())
+        .setDataSize(info.getDataSize())
+        .setIsMultipartKey(b.isMultipartKey())
+        .setMultipartUploadID(b.getMultipartUploadID())
+        .setMultipartUploadPartNumber(b.getMultipartNumber())
+        .build();
+    this.openID = b.getOpenHandler().getId();
     this.excludeList = createExcludeList();
 
+    this.streamBufferArgs = b.getStreamBufferArgs();
     this.bufferPool =
         new BufferPool(streamBufferArgs.getStreamBufferSize(),
             (int) (streamBufferArgs.getStreamBufferMaxSize() / streamBufferArgs
                 .getStreamBufferSize()),
             ByteStringConversion
-                .createByteBufferConversion(unsafeByteBufferConversion));
-    this.clientMetrics = clientMetrics;
-    this.streamBufferArgs = streamBufferArgs;
+                
.createByteBufferConversion(b.isUnsafeByteBufferConversionEnabled()));
+    this.clientMetrics = b.getClientMetrics();
   }
 
   ExcludeList createExcludeList() {
@@ -126,25 +116,6 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
         Clock.system(ZoneOffset.UTC));
   }
 
-  BlockOutputStreamEntryPool(ContainerClientMetrics clientMetrics,
-      OzoneClientConfig clientConfig, StreamBufferArgs streamBufferArgs) {
-    streamEntries = new ArrayList<>();
-    omClient = null;
-    keyArgs = null;
-    xceiverClientFactory = null;
-    config = clientConfig;
-    streamBufferArgs.setStreamBufferFlushDelay(false);
-    requestID = null;
-    int chunkSize = 0;
-    bufferPool = new BufferPool(chunkSize, 1);
-
-    currentStreamIndex = 0;
-    openID = -1;
-    excludeList = createExcludeList();
-    this.clientMetrics = clientMetrics;
-    this.streamBufferArgs = null;
-  }
-
   /**
    * When a key is opened, it is possible that there are some blocks already
    * allocated to it for this open session. In this case, to make use of these
@@ -156,10 +127,8 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
    *
    * @param version the set of blocks that are pre-allocated.
    * @param openVersion the version corresponding to the pre-allocation.
-   * @throws IOException
    */
-  public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
-      long openVersion) throws IOException {
+  public void addPreallocateBlocks(OmKeyLocationInfoGroup version, long 
openVersion) {
     // server may return any number of blocks, (0 to any)
     // only the blocks allocated in this open session (block createVersion
     // equals to open session version)
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
index 07d0f46069..7f6ce87d60 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -23,17 +23,10 @@ import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
-import org.apache.hadoop.hdds.scm.OzoneClientConfig;
-import org.apache.hadoop.hdds.scm.StreamBufferArgs;
-import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
-import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
-import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.hdds.utils.IOUtils;
-import org.apache.hadoop.security.token.Token;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,19 +68,10 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry {
   private int currentStreamIdx = 0;
   private long successfulBlkGrpAckedLen;
 
-  @SuppressWarnings({"parameternumber", "squid:S00107"})
-  ECBlockOutputStreamEntry(BlockID blockID, String key,
-      XceiverClientFactory xceiverClientManager, Pipeline pipeline, long 
length,
-      BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, ContainerClientMetrics clientMetrics,
-      StreamBufferArgs streamBufferArgs) {
-    super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
-        token, config, clientMetrics, streamBufferArgs);
-    assertInstanceOf(
-        pipeline.getReplicationConfig(), ECReplicationConfig.class);
-    this.replicationConfig =
-        (ECReplicationConfig) pipeline.getReplicationConfig();
-    this.length = replicationConfig.getData() * length;
+  ECBlockOutputStreamEntry(Builder b) {
+    super(b);
+    this.replicationConfig = 
assertInstanceOf(b.getPipeline().getReplicationConfig(), 
ECReplicationConfig.class);
+    this.length = replicationConfig.getData() * b.getLength();
   }
 
   @Override
@@ -433,82 +417,9 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry {
   /**
    * Builder class for ChunkGroupOutputStreamEntry.
    * */
-  public static class Builder {
-    private BlockID blockID;
-    private String key;
-    private XceiverClientFactory xceiverClientManager;
-    private Pipeline pipeline;
-    private long length;
-    private BufferPool bufferPool;
-    private Token<OzoneBlockTokenIdentifier> token;
-    private OzoneClientConfig config;
-    private ContainerClientMetrics clientMetrics;
-    private StreamBufferArgs streamBufferArgs;
-
-    public ECBlockOutputStreamEntry.Builder setBlockID(BlockID bID) {
-      this.blockID = bID;
-      return this;
-    }
-
-    public ECBlockOutputStreamEntry.Builder setKey(String keys) {
-      this.key = keys;
-      return this;
-    }
-
-    public ECBlockOutputStreamEntry.Builder setXceiverClientManager(
-        XceiverClientFactory
-            xClientManager) {
-      this.xceiverClientManager = xClientManager;
-      return this;
-    }
-
-    public ECBlockOutputStreamEntry.Builder setPipeline(Pipeline ppln) {
-      this.pipeline = ppln;
-      return this;
-    }
-
-    public ECBlockOutputStreamEntry.Builder setLength(long len) {
-      this.length = len;
-      return this;
-    }
-
-    public ECBlockOutputStreamEntry.Builder setBufferPool(BufferPool pool) {
-      this.bufferPool = pool;
-      return this;
-    }
-
-    public ECBlockOutputStreamEntry.Builder setConfig(
-        OzoneClientConfig clientConfig) {
-      this.config = clientConfig;
-      return this;
-    }
-
-    public ECBlockOutputStreamEntry.Builder setToken(
-        Token<OzoneBlockTokenIdentifier> bToken) {
-      this.token = bToken;
-      return this;
-    }
-
-    public ECBlockOutputStreamEntry.Builder setClientMetrics(
-        ContainerClientMetrics containerClientMetrics) {
-      this.clientMetrics = containerClientMetrics;
-      return this;
-    }
-
-    public ECBlockOutputStreamEntry.Builder setStreamBufferArgs(
-        StreamBufferArgs args) {
-      this.streamBufferArgs = args;
-      return this;
-    }
-
+  public static class Builder extends BlockOutputStreamEntry.Builder {
     public ECBlockOutputStreamEntry build() {
-      return new ECBlockOutputStreamEntry(blockID,
-          key,
-          xceiverClientManager,
-          pipeline,
-          length,
-          bufferPool,
-          token, config, clientMetrics, streamBufferArgs);
+      return new ECBlockOutputStreamEntry(this);
     }
   }
 }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
index e551605d84..e278097a49 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
@@ -17,19 +17,7 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
-import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
-import org.apache.hadoop.hdds.scm.OzoneClientConfig;
-import org.apache.hadoop.hdds.scm.StreamBufferArgs;
-import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
-import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
-
-import java.time.Clock;
-import java.time.ZoneOffset;
 
 /**
  * {@link BlockOutputStreamEntryPool} is responsible to manage OM communication
@@ -44,37 +32,14 @@ import java.time.ZoneOffset;
  * @see ECBlockOutputStreamEntry
  */
 public class ECBlockOutputStreamEntryPool extends BlockOutputStreamEntryPool {
-
-  @SuppressWarnings({"parameternumber", "squid:S00107"})
-  public ECBlockOutputStreamEntryPool(OzoneClientConfig config,
-      OzoneManagerProtocol omClient,
-      String requestId,
-      ReplicationConfig replicationConfig,
-      String uploadID,
-      int partNumber,
-      boolean isMultipart,
-      OmKeyInfo info,
-      boolean unsafeByteBufferConversion,
-      XceiverClientFactory xceiverClientFactory,
-      long openID,
-      ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs) 
{
-    super(config, omClient, requestId, replicationConfig, uploadID, partNumber,
-        isMultipart, info, unsafeByteBufferConversion, xceiverClientFactory,
-        openID, clientMetrics, streamBufferArgs);
-    assert replicationConfig instanceof ECReplicationConfig;
-  }
-
-  @Override
-  ExcludeList createExcludeList() {
-    return new ExcludeList(getConfig().getExcludeNodesExpiryTime(),
-        Clock.system(ZoneOffset.UTC));
+  public ECBlockOutputStreamEntryPool(ECKeyOutputStream.Builder builder) {
+    super(builder);
   }
 
   @Override
-  BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) {
-    return
-        new ECBlockOutputStreamEntry.Builder()
-            .setBlockID(subKeyInfo.getBlockID())
+  ECBlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) {
+    final ECBlockOutputStreamEntry.Builder b = new 
ECBlockOutputStreamEntry.Builder();
+    b.setBlockID(subKeyInfo.getBlockID())
             .setKey(getKeyName())
             .setXceiverClientManager(getXceiverClientFactory())
             .setPipeline(subKeyInfo.getPipeline())
@@ -83,8 +48,8 @@ public class ECBlockOutputStreamEntryPool extends 
BlockOutputStreamEntryPool {
             .setBufferPool(getBufferPool())
             .setToken(subKeyInfo.getToken())
             .setClientMetrics(getClientMetrics())
-            .setStreamBufferArgs(getStreamBufferArgs())
-            .build();
+            .setStreamBufferArgs(getStreamBufferArgs());
+    return b.build();
   }
 
   @Override
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index b5c36474ff..878558073f 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -17,12 +17,28 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.ozone.om.protocol.S3Auth;
+import org.apache.ozone.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.ozone.erasurecode.rawcoder.util.CodecUtil;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.nio.Buffer;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -35,30 +51,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.scm.OzoneClientConfig;
-import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
-import org.apache.hadoop.io.ByteBufferPool;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
-import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.ozone.om.protocol.S3Auth;
-import org.apache.ozone.erasurecode.rawcoder.RawErasureEncoder;
-import org.apache.ozone.erasurecode.rawcoder.util.CodecUtil;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * ECKeyOutputStream handles the EC writes by writing the data into underlying
  * block output streams chunk by chunk.
@@ -100,22 +92,6 @@ public final class ECKeyOutputStream extends KeyOutputStream
   private long offset;
   // how much data has been ingested into the stream
   private long writeOffset;
-  private final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool;
-
-  @VisibleForTesting
-  public List<BlockOutputStreamEntry> getStreamEntries() {
-    return blockOutputStreamEntryPool.getStreamEntries();
-  }
-
-  @VisibleForTesting
-  public XceiverClientFactory getXceiverClientFactory() {
-    return blockOutputStreamEntryPool.getXceiverClientFactory();
-  }
-
-  @VisibleForTesting
-  public List<OmKeyLocationInfo> getLocationInfoList() {
-    return blockOutputStreamEntryPool.getLocationInfoList();
-  }
 
   @VisibleForTesting
   public void insertFlushCheckpoint(long version) throws IOException {
@@ -128,8 +104,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
   }
 
   private ECKeyOutputStream(Builder builder) {
-    super(builder.getReplicationConfig(), builder.getClientMetrics(),
-        builder.getClientConfig(), builder.getStreamBufferArgs());
+    super(builder.getReplicationConfig(), new 
ECBlockOutputStreamEntryPool(builder));
     this.config = builder.getClientConfig();
     this.bufferPool = builder.getByteBufferPool();
     // For EC, cell/chunk size and buffer size can be same for now.
@@ -140,16 +115,6 @@ public final class ECKeyOutputStream extends 
KeyOutputStream
         ecChunkSize, numDataBlks, numParityBlks, bufferPool);
     chunkIndex = 0;
     ecStripeQueue = new ArrayBlockingQueue<>(config.getEcStripeQueueSize());
-    OmKeyInfo info = builder.getOpenHandler().getKeyInfo();
-    blockOutputStreamEntryPool =
-        new ECBlockOutputStreamEntryPool(config,
-            builder.getOmClient(), builder.getRequestID(),
-            builder.getReplicationConfig(),
-            builder.getMultipartUploadID(), builder.getMultipartNumber(),
-            builder.isMultipartKey(),
-            info, builder.isUnsafeByteBufferConversionEnabled(),
-            builder.getXceiverManager(), builder.getOpenHandler().getId(),
-            builder.getClientMetrics(), builder.getStreamBufferArgs());
 
     this.writeOffset = 0;
     this.encoder = CodecUtil.createRawEncoderWithFallback(
@@ -164,22 +129,9 @@ public final class ECKeyOutputStream extends 
KeyOutputStream
     this.atomicKeyCreation = builder.getAtomicKeyCreation();
   }
 
-  /**
-   * When a key is opened, it is possible that there are some blocks already
-   * allocated to it for this open session. In this case, to make use of these
-   * blocks, we need to add these blocks to stream entries. But, a key's 
version
-   * also includes blocks from previous versions, we need to avoid adding these
-   * old blocks to stream entries, because these old blocks should not be 
picked
-   * for write. To do this, the following method checks that, only those
-   * blocks created in this particular open version are added to stream 
entries.
-   *
-   * @param version     the set of blocks that are pre-allocated.
-   * @param openVersion the version corresponding to the pre-allocation.
-   * @throws IOException
-   */
-  public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
-      long openVersion) throws IOException {
-    blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion);
+  @Override
+  protected ECBlockOutputStreamEntryPool getBlockOutputStreamEntryPool() {
+    return (ECBlockOutputStreamEntryPool) 
super.getBlockOutputStreamEntryPool();
   }
 
   /**
@@ -218,6 +170,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
     final ByteBuffer[] dataBuffers = stripe.getDataBuffers();
     offset -= Arrays.stream(dataBuffers).mapToInt(Buffer::limit).sum();
 
+    final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = 
getBlockOutputStreamEntryPool();
     final ECBlockOutputStreamEntry failedStreamEntry =
         blockOutputStreamEntryPool.getCurrentStreamEntry();
     failedStreamEntry.resetToFirstEntry();
@@ -256,8 +209,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
   private StripeWriteStatus commitStripeWrite(ECChunkBuffers stripe)
       throws IOException {
 
-    ECBlockOutputStreamEntry streamEntry =
-        blockOutputStreamEntryPool.getCurrentStreamEntry();
+    final ECBlockOutputStreamEntry streamEntry = 
getBlockOutputStreamEntryPool().getCurrentStreamEntry();
     List<ECBlockOutputStream> failedStreams =
         streamEntry.streamsWithWriteFailure();
     if (!failedStreams.isEmpty()) {
@@ -297,6 +249,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
       List<ECBlockOutputStream> failedStreams) {
 
     // Exclude the failed pipeline
+    final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = 
getBlockOutputStreamEntryPool();
     blockOutputStreamEntryPool.getExcludeList().addPipeline(pipeline.getId());
 
     // If the failure is NOT caused by other reasons (e.g. container full),
@@ -362,6 +315,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
   }
 
   private void writeDataCells(ECChunkBuffers stripe) throws IOException {
+    final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = 
getBlockOutputStreamEntryPool();
     blockOutputStreamEntryPool.allocateBlockIfNeeded();
     ByteBuffer[] dataCells = stripe.getDataBuffers();
     for (int i = 0; i < numDataBlks; i++) {
@@ -374,6 +328,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
 
   private void writeParityCells(ECChunkBuffers stripe) {
     // Move the stream entry cursor to parity block index
+    final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = 
getBlockOutputStreamEntryPool();
     blockOutputStreamEntryPool
         .getCurrentStreamEntry().forceToFirstParityBlock();
     ByteBuffer[] parityCells = stripe.getParityBuffers();
@@ -413,7 +368,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
       // The len cannot be bigger than cell buffer size.
       assert buffer.limit() <= ecChunkSize : "The buffer size: " +
           buffer.limit() + " should not exceed EC chunk size: " + ecChunkSize;
-      writeToOutputStream(blockOutputStreamEntryPool.getCurrentStreamEntry(),
+      
writeToOutputStream(getBlockOutputStreamEntryPool().getCurrentStreamEntry(),
           buffer.array(), buffer.limit(), 0, isParity);
     } catch (Exception e) {
       markStreamAsFailed(e);
@@ -449,8 +404,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
     Preconditions.checkNotNull(t);
     boolean containerExclusionException = checkIfContainerToExclude(t);
     if (containerExclusionException) {
-      blockOutputStreamEntryPool.getExcludeList()
-          .addPipeline(streamEntry.getPipeline().getId());
+      
getBlockOutputStreamEntryPool().getExcludeList().addPipeline(streamEntry.getPipeline().getId());
     }
     markStreamAsFailed(exception);
   }
@@ -460,7 +414,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
   }
 
   private void markStreamAsFailed(Exception e) {
-    blockOutputStreamEntryPool.getCurrentStreamEntry().markFailed(e);
+    getBlockOutputStreamEntryPool().getCurrentStreamEntry().markFailed(e);
   }
 
   @Override
@@ -470,6 +424,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
 
   private void closeCurrentStreamEntry()
       throws IOException {
+    final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = 
getBlockOutputStreamEntryPool();
     if (!blockOutputStreamEntryPool.isEmpty()) {
       while (true) {
         try {
@@ -503,6 +458,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
       return;
     }
     closed = true;
+    final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = 
getBlockOutputStreamEntryPool();
     try {
       if (!closing) {
         // If stripe buffer is not empty, encode and flush the stripe.
@@ -614,20 +570,6 @@ public final class ECKeyOutputStream extends 
KeyOutputStream
     buf.position(limit);
   }
 
-  public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
-    return blockOutputStreamEntryPool.getCommitUploadPartInfo();
-  }
-
-  @VisibleForTesting
-  public ExcludeList getExcludeList() {
-    return blockOutputStreamEntryPool.getExcludeList();
-  }
-
-  @Override
-  public Map<String, String> getMetadata() {
-    return this.blockOutputStreamEntryPool.getMetadata();
-  }
-
   /**
    * Builder class of ECKeyOutputStream.
    */
@@ -682,9 +624,8 @@ public final class ECKeyOutputStream extends KeyOutputStream
    */
   private void checkNotClosed() throws IOException {
     if (closing || closed) {
-      throw new IOException(
-          ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: "
-              + blockOutputStreamEntryPool.getKeyName());
+      throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED + " Key: "
+          + getBlockOutputStreamEntryPool().getKeyName());
     }
   }
 
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 8b128e9cd9..9ea17cf8b2 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -69,7 +69,6 @@ import org.slf4j.LoggerFactory;
 public class KeyOutputStream extends OutputStream
     implements Syncable, KeyMetadataAware {
 
-  private OzoneClientConfig config;
   private final ReplicationConfig replication;
 
   /**
@@ -105,11 +104,8 @@ public class KeyOutputStream extends OutputStream
    */
   private boolean atomicKeyCreation;
 
-  public KeyOutputStream(ReplicationConfig replicationConfig,
-      ContainerClientMetrics clientMetrics, OzoneClientConfig clientConfig,
-      StreamBufferArgs streamBufferArgs) {
+  public KeyOutputStream(ReplicationConfig replicationConfig, 
BlockOutputStreamEntryPool blockOutputStreamEntryPool) {
     this.replication = replicationConfig;
-    this.config = clientConfig;
     closed = false;
     this.retryPolicyMap = HddsClientUtils.getExceptionList()
         .stream()
@@ -117,18 +113,16 @@ public class KeyOutputStream extends OutputStream
             e -> RetryPolicies.TRY_ONCE_THEN_FAIL));
     retryCount = 0;
     offset = 0;
-    blockOutputStreamEntryPool =
-        new BlockOutputStreamEntryPool(clientMetrics, clientConfig, 
streamBufferArgs);
+    this.blockOutputStreamEntryPool = blockOutputStreamEntryPool;
   }
 
-  @VisibleForTesting
-  public List<BlockOutputStreamEntry> getStreamEntries() {
-    return blockOutputStreamEntryPool.getStreamEntries();
+  protected BlockOutputStreamEntryPool getBlockOutputStreamEntryPool() {
+    return blockOutputStreamEntryPool;
   }
 
   @VisibleForTesting
-  public XceiverClientFactory getXceiverClientFactory() {
-    return blockOutputStreamEntryPool.getXceiverClientFactory();
+  public List<BlockOutputStreamEntry> getStreamEntries() {
+    return blockOutputStreamEntryPool.getStreamEntries();
   }
 
   @VisibleForTesting
@@ -146,39 +140,18 @@ public class KeyOutputStream extends OutputStream
     return clientID;
   }
 
-  @SuppressWarnings({"parameternumber", "squid:S00107"})
-  public KeyOutputStream(
-      OzoneClientConfig config,
-      OpenKeySession handler,
-      XceiverClientFactory xceiverClientManager,
-      OzoneManagerProtocol omClient,
-      String requestId, ReplicationConfig replicationConfig,
-      String uploadID, int partNumber, boolean isMultipart,
-      boolean unsafeByteBufferConversion,
-      ContainerClientMetrics clientMetrics,
-      boolean atomicKeyCreation, StreamBufferArgs streamBufferArgs
-  ) {
-    this.config = config;
-    this.replication = replicationConfig;
-    blockOutputStreamEntryPool =
-        new BlockOutputStreamEntryPool(
-            config,
-            omClient,
-            requestId, replicationConfig,
-            uploadID, partNumber,
-            isMultipart, handler.getKeyInfo(),
-            unsafeByteBufferConversion,
-            xceiverClientManager,
-            handler.getId(),
-            clientMetrics, streamBufferArgs);
+  public KeyOutputStream(Builder b) {
+    this.replication = b.replicationConfig;
+    this.blockOutputStreamEntryPool = new BlockOutputStreamEntryPool(b);
+    final OzoneClientConfig config = b.getClientConfig();
     this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException(
         config.getMaxRetryCount(), config.getRetryInterval());
     this.retryCount = 0;
     this.isException = false;
     this.writeOffset = 0;
-    this.clientID = handler.getId();
-    this.atomicKeyCreation = atomicKeyCreation;
-    this.streamBufferArgs = streamBufferArgs;
+    this.clientID = b.getOpenHandler().getId();
+    this.atomicKeyCreation = b.getAtomicKeyCreation();
+    this.streamBufferArgs = b.getStreamBufferArgs();
   }
 
   /**
@@ -192,10 +165,8 @@ public class KeyOutputStream extends OutputStream
    *
    * @param version the set of blocks that are pre-allocated.
    * @param openVersion the version corresponding to the pre-allocation.
-   * @throws IOException
    */
-  public synchronized void addPreallocateBlocks(OmKeyLocationInfoGroup version,
-      long openVersion) throws IOException {
+  public synchronized void addPreallocateBlocks(OmKeyLocationInfoGroup 
version, long openVersion) {
     blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion);
   }
 
@@ -729,20 +700,7 @@ public class KeyOutputStream extends OutputStream
     }
 
     public KeyOutputStream build() {
-      return new KeyOutputStream(
-          clientConfig,
-          openHandler,
-          xceiverManager,
-          omClient,
-          requestID,
-          replicationConfig,
-          multipartUploadID,
-          multipartNumber,
-          isMultipartKey,
-          unsafeByteBufferConversion,
-          clientMetrics,
-          atomicKeyCreation,
-          streamBufferArgs);
+      return new KeyOutputStream(this);
     }
 
   }
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockOutputStreamEntry.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockOutputStreamEntry.java
index 7760e88e48..718e724e58 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockOutputStreamEntry.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockOutputStreamEntry.java
@@ -63,10 +63,10 @@ public class TestECBlockOutputStreamEntry {
     try (XceiverClientManager manager =
         new XceiverClientManager(new OzoneConfiguration())) {
       HashSet<XceiverClientSpi> clients = new HashSet<>();
-      ECBlockOutputStreamEntry entry = new ECBlockOutputStreamEntry.Builder()
-          .setXceiverClientManager(manager)
-          .setPipeline(anECPipeline)
-          .build();
+      final ECBlockOutputStreamEntry.Builder b = new 
ECBlockOutputStreamEntry.Builder();
+      b.setXceiverClientManager(manager)
+          .setPipeline(anECPipeline);
+      final ECBlockOutputStreamEntry entry = b.build();
       for (int i = 0; i < nodes.size(); i++) {
         clients.add(
             manager.acquireClient(
@@ -101,10 +101,10 @@ public class TestECBlockOutputStreamEntry {
     try (XceiverClientManager manager =
         new XceiverClientManager(new OzoneConfiguration())) {
       HashSet<XceiverClientSpi> clients = new HashSet<>();
-      ECBlockOutputStreamEntry entry = new ECBlockOutputStreamEntry.Builder()
-          .setXceiverClientManager(manager)
-          .setPipeline(anECPipeline)
-          .build();
+      final ECBlockOutputStreamEntry.Builder b = new 
ECBlockOutputStreamEntry.Builder();
+      b.setXceiverClientManager(manager)
+          .setPipeline(anECPipeline);
+      final ECBlockOutputStreamEntry entry = b.build();
       for (int i = 0; i < nodes.size(); i++) {
         clients.add(
             manager.acquireClient(
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java
index da2fb26ec8..ca3caa4ee7 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java
@@ -24,8 +24,6 @@ import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.io.KeyMetadataAware;
-import org.apache.hadoop.hdds.scm.OzoneClientConfig;
-import org.apache.hadoop.hdds.scm.StreamBufferArgs;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
@@ -81,10 +79,7 @@ public class OzoneOutputStreamStub extends OzoneOutputStream 
{
     OzoneConfiguration conf = new OzoneConfiguration();
     ReplicationConfig replicationConfig =
         ReplicationConfig.getDefault(conf);
-    OzoneClientConfig ozoneClientConfig = 
conf.getObject(OzoneClientConfig.class);
-    StreamBufferArgs streamBufferArgs =
-        StreamBufferArgs.getDefaultStreamBufferArgs(replicationConfig, 
ozoneClientConfig);
-    return new KeyOutputStream(replicationConfig, null, ozoneClientConfig, 
streamBufferArgs) {
+    return new KeyOutputStream(replicationConfig, null) {
       @Override
       public synchronized OmMultipartCommitUploadPartInfo
           getCommitUploadPartInfo() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to