http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
deleted file mode 100644
index 2b10578..0000000
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.client.io;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.Seekable;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.client.BlockID;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
-import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import 
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdds.scm.storage.ChunkInputStream;
-import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
-import org.apache.ratis.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Maintaining a list of ChunkInputStream. Read based on offset.
- */
-public class ChunkGroupInputStream extends InputStream implements Seekable {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ChunkGroupInputStream.class);
-
-  private static final int EOF = -1;
-
-  private final ArrayList<ChunkInputStreamEntry> streamEntries;
-  // streamOffset[i] stores the offset at which chunkInputStream i stores
-  // data in the key
-  private long[] streamOffset = null;
-  private int currentStreamIndex;
-  private long length = 0;
-  private boolean closed = false;
-  private String key;
-
-  public ChunkGroupInputStream() {
-    streamEntries = new ArrayList<>();
-    currentStreamIndex = 0;
-  }
-
-  @VisibleForTesting
-  public synchronized int getCurrentStreamIndex() {
-    return currentStreamIndex;
-  }
-
-  @VisibleForTesting
-  public long getRemainingOfIndex(int index) throws IOException {
-    return streamEntries.get(index).getRemaining();
-  }
-
-  /**
-   * Append another stream to the end of the list.
-   *
-   * @param stream       the stream instance.
-   * @param streamLength the max number of bytes that should be written to this
-   *                     stream.
-   */
-  public synchronized void addStream(ChunkInputStream stream,
-      long streamLength) {
-    streamEntries.add(new ChunkInputStreamEntry(stream, streamLength));
-  }
-
-
-  @Override
-  public synchronized int read() throws IOException {
-    byte[] buf = new byte[1];
-    if (read(buf, 0, 1) == EOF) {
-      return EOF;
-    }
-    return Byte.toUnsignedInt(buf[0]);
-  }
-
-  @Override
-  public synchronized int read(byte[] b, int off, int len) throws IOException {
-    checkNotClosed();
-    if (b == null) {
-      throw new NullPointerException();
-    }
-    if (off < 0 || len < 0 || len > b.length - off) {
-      throw new IndexOutOfBoundsException();
-    }
-    if (len == 0) {
-      return 0;
-    }
-    int totalReadLen = 0;
-    while (len > 0) {
-      if (streamEntries.size() <= currentStreamIndex) {
-        return totalReadLen == 0 ? EOF : totalReadLen;
-      }
-      ChunkInputStreamEntry current = streamEntries.get(currentStreamIndex);
-      int numBytesToRead = Math.min(len, (int)current.getRemaining());
-      int numBytesRead = current.read(b, off, numBytesToRead);
-      if (numBytesRead != numBytesToRead) {
-        // This implies that there is either data loss or corruption in the
-        // chunk entries. Even EOF in the current stream would be covered in
-        // this case.
-        throw new IOException(String.format(
-            "Inconsistent read for blockID=%s length=%d numBytesRead=%d",
-            current.chunkInputStream.getBlockID(), current.length,
-            numBytesRead));
-      }
-      totalReadLen += numBytesRead;
-      off += numBytesRead;
-      len -= numBytesRead;
-      if (current.getRemaining() <= 0) {
-        currentStreamIndex += 1;
-      }
-    }
-    return totalReadLen;
-  }
-
-  @Override
-  public void seek(long pos) throws IOException {
-    checkNotClosed();
-    if (pos < 0 || pos >= length) {
-      if (pos == 0) {
-        // It is possible for length and pos to be zero in which case
-        // seek should return instead of throwing exception
-        return;
-      }
-      throw new EOFException(
-          "EOF encountered at pos: " + pos + " for key: " + key);
-    }
-    Preconditions.assertTrue(currentStreamIndex >= 0);
-    if (currentStreamIndex >= streamEntries.size()) {
-      currentStreamIndex = Arrays.binarySearch(streamOffset, pos);
-    } else if (pos < streamOffset[currentStreamIndex]) {
-      currentStreamIndex =
-          Arrays.binarySearch(streamOffset, 0, currentStreamIndex, pos);
-    } else if (pos >= streamOffset[currentStreamIndex] + streamEntries
-        .get(currentStreamIndex).length) {
-      currentStreamIndex = Arrays
-          .binarySearch(streamOffset, currentStreamIndex + 1,
-              streamEntries.size(), pos);
-    }
-    if (currentStreamIndex < 0) {
-      // Binary search returns -insertionPoint - 1  if element is not present
-      // in the array. insertionPoint is the point at which element would be
-      // inserted in the sorted array. We need to adjust the currentStreamIndex
-      // accordingly so that currentStreamIndex = insertionPoint - 1
-      currentStreamIndex = -currentStreamIndex - 2;
-    }
-    // seek to the proper offset in the ChunkInputStream
-    streamEntries.get(currentStreamIndex)
-        .seek(pos - streamOffset[currentStreamIndex]);
-  }
-
-  @Override
-  public long getPos() throws IOException {
-    return length == 0 ? 0 :
-        streamOffset[currentStreamIndex] + 
streamEntries.get(currentStreamIndex)
-            .getPos();
-  }
-
-  @Override
-  public boolean seekToNewSource(long targetPos) throws IOException {
-    return false;
-  }
-
-  @Override
-  public int available() throws IOException {
-    checkNotClosed();
-    long remaining = length - getPos();
-    return remaining <= Integer.MAX_VALUE ? (int) remaining : 
Integer.MAX_VALUE;
-  }
-
-  @Override
-  public void close() throws IOException {
-    closed = true;
-    for (int i = 0; i < streamEntries.size(); i++) {
-      streamEntries.get(i).close();
-    }
-  }
-
-  /**
-   * Encapsulates ChunkInputStream.
-   */
-  public static class ChunkInputStreamEntry extends InputStream
-      implements Seekable {
-
-    private final ChunkInputStream chunkInputStream;
-    private final long length;
-
-    public ChunkInputStreamEntry(ChunkInputStream chunkInputStream,
-        long length) {
-      this.chunkInputStream = chunkInputStream;
-      this.length = length;
-    }
-
-    synchronized long getRemaining() throws IOException {
-      return length - getPos();
-    }
-
-    @Override
-    public synchronized int read(byte[] b, int off, int len)
-        throws IOException {
-      int readLen = chunkInputStream.read(b, off, len);
-      return readLen;
-    }
-
-    @Override
-    public synchronized int read() throws IOException {
-      int data = chunkInputStream.read();
-      return data;
-    }
-
-    @Override
-    public synchronized void close() throws IOException {
-      chunkInputStream.close();
-    }
-
-    @Override
-    public void seek(long pos) throws IOException {
-      chunkInputStream.seek(pos);
-    }
-
-    @Override
-    public long getPos() throws IOException {
-      return chunkInputStream.getPos();
-    }
-
-    @Override
-    public boolean seekToNewSource(long targetPos) throws IOException {
-      return false;
-    }
-  }
-
-  public static LengthInputStream getFromOmKeyInfo(
-      OmKeyInfo keyInfo,
-      XceiverClientManager xceiverClientManager,
-      StorageContainerLocationProtocolClientSideTranslatorPB
-          storageContainerLocationClient,
-      String requestId) throws IOException {
-    long length = 0;
-    long containerKey;
-    ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream();
-    groupInputStream.key = keyInfo.getKeyName();
-    List<OmKeyLocationInfo> keyLocationInfos =
-        keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly();
-    groupInputStream.streamOffset = new long[keyLocationInfos.size()];
-    for (int i = 0; i < keyLocationInfos.size(); i++) {
-      OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(i);
-      BlockID blockID = omKeyLocationInfo.getBlockID();
-      long containerID = blockID.getContainerID();
-      ContainerWithPipeline containerWithPipeline =
-          storageContainerLocationClient.getContainerWithPipeline(containerID);
-      XceiverClientSpi xceiverClient = xceiverClientManager
-          .acquireClient(containerWithPipeline.getPipeline(), containerID);
-      boolean success = false;
-      containerKey = omKeyLocationInfo.getLocalID();
-      try {
-        LOG.debug("get key accessing {} {}",
-            containerID, containerKey);
-        groupInputStream.streamOffset[i] = length;
-        ContainerProtos.DatanodeBlockID datanodeBlockID = blockID
-            .getDatanodeBlockIDProtobuf();
-        ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls
-            .getBlock(xceiverClient, datanodeBlockID, requestId);
-        List<ContainerProtos.ChunkInfo> chunks =
-            response.getBlockData().getChunksList();
-        for (ContainerProtos.ChunkInfo chunk : chunks) {
-          length += chunk.getLen();
-        }
-        success = true;
-        ChunkInputStream inputStream = new ChunkInputStream(
-            omKeyLocationInfo.getBlockID(), xceiverClientManager, 
xceiverClient,
-            chunks, requestId);
-        groupInputStream.addStream(inputStream,
-            omKeyLocationInfo.getLength());
-      } finally {
-        if (!success) {
-          xceiverClientManager.releaseClient(xceiverClient);
-        }
-      }
-    }
-    groupInputStream.length = length;
-    return new LengthInputStream(groupInputStream, length);
-  }
-
-  /**
-   * Verify that the input stream is open. Non blocking; this gives
-   * the last state of the volatile {@link #closed} field.
-   * @throws IOException if the connection is closed.
-   */
-  private void checkNotClosed() throws IOException {
-    if (closed) {
-      throw new IOException(
-          ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + key);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
deleted file mode 100644
index 3742a9a..0000000
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
+++ /dev/null
@@ -1,733 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.client.io;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.hdds.client.BlockID;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
-import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
-import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
-import 
org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .StorageContainerException;
-import org.apache.hadoop.hdds.scm.protocolPB
-    .StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdds.scm.storage.ChunkOutputStream;
-import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.ListIterator;
-
-/**
- * Maintaining a list of ChunkInputStream. Write based on offset.
- *
- * Note that this may write to multiple containers in one write call. In case
- * that first container succeeded but later ones failed, the succeeded writes
- * are not rolled back.
- *
- * TODO : currently not support multi-thread access.
- */
-public class ChunkGroupOutputStream extends OutputStream {
-
-  public static final Logger LOG =
-      LoggerFactory.getLogger(ChunkGroupOutputStream.class);
-
-  // array list's get(index) is O(1)
-  private final ArrayList<ChunkOutputStreamEntry> streamEntries;
-  private int currentStreamIndex;
-  private long byteOffset;
-  private final OzoneManagerProtocolClientSideTranslatorPB omClient;
-  private final
-      StorageContainerLocationProtocolClientSideTranslatorPB scmClient;
-  private final OmKeyArgs keyArgs;
-  private final long openID;
-  private final XceiverClientManager xceiverClientManager;
-  private final int chunkSize;
-  private final String requestID;
-  private boolean closed;
-  private final RetryPolicy retryPolicy;
-  /**
-   * A constructor for testing purpose only.
-   */
-  @VisibleForTesting
-  public ChunkGroupOutputStream() {
-    streamEntries = new ArrayList<>();
-    omClient = null;
-    scmClient = null;
-    keyArgs = null;
-    openID = -1;
-    xceiverClientManager = null;
-    chunkSize = 0;
-    requestID = null;
-    closed = false;
-    retryPolicy = null;
-  }
-
-  /**
-   * For testing purpose only. Not building output stream from blocks, but
-   * taking from externally.
-   *
-   * @param outputStream
-   * @param length
-   */
-  @VisibleForTesting
-  public void addStream(OutputStream outputStream, long length) {
-    streamEntries.add(new ChunkOutputStreamEntry(outputStream, length));
-  }
-
-  @VisibleForTesting
-  public List<ChunkOutputStreamEntry> getStreamEntries() {
-    return streamEntries;
-  }
-
-  public List<OmKeyLocationInfo> getLocationInfoList() {
-    List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
-    for (ChunkOutputStreamEntry streamEntry : streamEntries) {
-      OmKeyLocationInfo info =
-          new OmKeyLocationInfo.Builder().setBlockID(streamEntry.blockID)
-              .setShouldCreateContainer(false)
-              .setLength(streamEntry.currentPosition).setOffset(0).build();
-      locationInfoList.add(info);
-    }
-    return locationInfoList;
-  }
-
-  public ChunkGroupOutputStream(
-      OpenKeySession handler, XceiverClientManager xceiverClientManager,
-      StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
-      OzoneManagerProtocolClientSideTranslatorPB omClient,
-      int chunkSize, String requestId, ReplicationFactor factor,
-      ReplicationType type, RetryPolicy retryPolicy) throws IOException {
-    this.streamEntries = new ArrayList<>();
-    this.currentStreamIndex = 0;
-    this.byteOffset = 0;
-    this.omClient = omClient;
-    this.scmClient = scmClient;
-    OmKeyInfo info = handler.getKeyInfo();
-    this.keyArgs = new OmKeyArgs.Builder()
-        .setVolumeName(info.getVolumeName())
-        .setBucketName(info.getBucketName())
-        .setKeyName(info.getKeyName())
-        .setType(type)
-        .setFactor(factor)
-        .setDataSize(info.getDataSize()).build();
-    this.openID = handler.getId();
-    this.xceiverClientManager = xceiverClientManager;
-    this.chunkSize = chunkSize;
-    this.requestID = requestId;
-    this.retryPolicy = retryPolicy;
-    LOG.debug("Expecting open key with one block, but got" +
-        info.getKeyLocationVersions().size());
-  }
-
-  /**
-   * When a key is opened, it is possible that there are some blocks already
-   * allocated to it for this open session. In this case, to make use of these
-   * blocks, we need to add these blocks to stream entries. But, a key's 
version
-   * also includes blocks from previous versions, we need to avoid adding these
-   * old blocks to stream entries, because these old blocks should not be 
picked
-   * for write. To do this, the following method checks that, only those
-   * blocks created in this particular open version are added to stream 
entries.
-   *
-   * @param version the set of blocks that are pre-allocated.
-   * @param openVersion the version corresponding to the pre-allocation.
-   * @throws IOException
-   */
-  public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
-      long openVersion) throws IOException {
-    // server may return any number of blocks, (0 to any)
-    // only the blocks allocated in this open session (block createVersion
-    // equals to open session version)
-    for (OmKeyLocationInfo subKeyInfo : version.getLocationList()) {
-      if (subKeyInfo.getCreateVersion() == openVersion) {
-        checkKeyLocationInfo(subKeyInfo);
-      }
-    }
-  }
-
-  private void checkKeyLocationInfo(OmKeyLocationInfo subKeyInfo)
-      throws IOException {
-    ContainerWithPipeline containerWithPipeline = scmClient
-        .getContainerWithPipeline(subKeyInfo.getContainerID());
-    ContainerInfo container = containerWithPipeline.getContainerInfo();
-
-    XceiverClientSpi xceiverClient =
-        xceiverClientManager.acquireClient(containerWithPipeline.getPipeline(),
-            container.getContainerID());
-    // create container if needed
-    if (subKeyInfo.getShouldCreateContainer()) {
-      try {
-        ContainerProtocolCalls.createContainer(xceiverClient,
-            container.getContainerID(), requestID);
-        scmClient.notifyObjectStageChange(
-            ObjectStageChangeRequestProto.Type.container,
-            subKeyInfo.getContainerID(),
-            ObjectStageChangeRequestProto.Op.create,
-            ObjectStageChangeRequestProto.Stage.complete);
-      } catch (StorageContainerException ex) {
-        if (ex.getResult().equals(Result.CONTAINER_EXISTS)) {
-          //container already exist, this should never happen
-          LOG.debug("Container {} already exists.",
-              container.getContainerID());
-        } else {
-          LOG.error("Container creation failed for {}.",
-              container.getContainerID(), ex);
-          throw ex;
-        }
-      }
-    }
-    streamEntries.add(new ChunkOutputStreamEntry(subKeyInfo.getBlockID(),
-        keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID,
-        chunkSize, subKeyInfo.getLength()));
-  }
-
-  @VisibleForTesting
-  public long getByteOffset() {
-    return byteOffset;
-  }
-
-
-  @Override
-  public void write(int b) throws IOException {
-    byte[] buf = new byte[1];
-    buf[0] = (byte) b;
-    write(buf, 0, 1);
-  }
-
-  /**
-   * Try to write the bytes sequence b[off:off+len) to streams.
-   *
-   * NOTE: Throws exception if the data could not fit into the remaining space.
-   * In which case nothing will be written.
-   * TODO:May need to revisit this behaviour.
-   *
-   * @param b byte data
-   * @param off starting offset
-   * @param len length to write
-   * @throws IOException
-   */
-  @Override
-  public void write(byte[] b, int off, int len)
-      throws IOException {
-    checkNotClosed();
-    handleWrite(b, off, len);
-  }
-
-  private void handleWrite(byte[] b, int off, int len) throws IOException {
-    if (b == null) {
-      throw new NullPointerException();
-    }
-    if ((off < 0) || (off > b.length) || (len < 0) ||
-        ((off + len) > b.length) || ((off + len) < 0)) {
-      throw new IndexOutOfBoundsException();
-    }
-    if (len == 0) {
-      return;
-    }
-    int succeededAllocates = 0;
-    while (len > 0) {
-      if (streamEntries.size() <= currentStreamIndex) {
-        Preconditions.checkNotNull(omClient);
-        // allocate a new block, if a exception happens, log an error and
-        // throw exception to the caller directly, and the write fails.
-        try {
-          allocateNewBlock(currentStreamIndex);
-          succeededAllocates += 1;
-        } catch (IOException ioe) {
-          LOG.error("Try to allocate more blocks for write failed, already " +
-              "allocated " + succeededAllocates + " blocks for this write.");
-          throw ioe;
-        }
-      }
-      // in theory, this condition should never violate due the check above
-      // still do a sanity check.
-      Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
-      ChunkOutputStreamEntry current = streamEntries.get(currentStreamIndex);
-      int writeLen = Math.min(len, (int) current.getRemaining());
-      try {
-        current.write(b, off, writeLen);
-      } catch (IOException ioe) {
-        if (checkIfContainerIsClosed(ioe)) {
-          handleCloseContainerException(current, currentStreamIndex);
-          continue;
-        } else {
-          throw ioe;
-        }
-      }
-      if (current.getRemaining() <= 0) {
-        // since the current block is already written close the stream.
-        handleFlushOrClose(true);
-        currentStreamIndex += 1;
-      }
-      len -= writeLen;
-      off += writeLen;
-      byteOffset += writeLen;
-    }
-  }
-
-  private long getCommittedBlockLength(ChunkOutputStreamEntry streamEntry)
-      throws IOException {
-    long blockLength;
-    ContainerProtos.GetCommittedBlockLengthResponseProto responseProto;
-    RetryPolicy.RetryAction action;
-    int numRetries = 0;
-    while (true) {
-      try {
-        responseProto = ContainerProtocolCalls
-            .getCommittedBlockLength(streamEntry.xceiverClient,
-                streamEntry.blockID, requestID);
-        blockLength = responseProto.getBlockLength();
-        return blockLength;
-      } catch (StorageContainerException sce) {
-        try {
-          action = retryPolicy.shouldRetry(sce, numRetries, 0, true);
-        } catch (Exception e) {
-          throw e instanceof IOException ? (IOException) e : new 
IOException(e);
-        }
-        if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
-          if (action.reason != null) {
-            LOG.error(
-                "GetCommittedBlockLength request failed. " + action.reason,
-                sce);
-          }
-          throw sce;
-        }
-
-        // Throw the exception if the thread is interrupted
-        if (Thread.currentThread().isInterrupted()) {
-          LOG.warn("Interrupted while trying for connection");
-          throw sce;
-        }
-        Preconditions.checkArgument(
-            action.action == RetryPolicy.RetryAction.RetryDecision.RETRY);
-        try {
-          Thread.sleep(action.delayMillis);
-        } catch (InterruptedException e) {
-          throw (IOException) new InterruptedIOException(
-              "Interrupted: action=" + action + ", retry policy=" + 
retryPolicy)
-              .initCause(e);
-        }
-        numRetries++;
-        LOG.trace("Retrying GetCommittedBlockLength request. Already tried "
-            + numRetries + " time(s); retry policy is " + retryPolicy);
-        continue;
-      }
-    }
-  }
-
-  /**
-   * Discards the subsequent pre allocated blocks and removes the streamEntries
-   * from the streamEntries list for the container which is closed.
-   * @param containerID id of the closed container
-   */
-  private void discardPreallocatedBlocks(long containerID) {
-    // currentStreamIndex < streamEntries.size() signifies that, there are 
still
-    // pre allocated blocks available.
-    if (currentStreamIndex < streamEntries.size()) {
-      ListIterator<ChunkOutputStreamEntry> streamEntryIterator =
-          streamEntries.listIterator(currentStreamIndex);
-      while (streamEntryIterator.hasNext()) {
-        if (streamEntryIterator.next().blockID.getContainerID()
-            == containerID) {
-          streamEntryIterator.remove();
-        }
-      }
-    }
-  }
-
-  /**
-   * It might be possible that the blocks pre allocated might never get written
-   * while the stream gets closed normally. In such cases, it would be a good
-   * idea to trim down the locationInfoList by removing the unused blocks if 
any
-   * so as only the used block info gets updated on OzoneManager during close.
-   */
-  private void removeEmptyBlocks() {
-    if (currentStreamIndex < streamEntries.size()) {
-      ListIterator<ChunkOutputStreamEntry> streamEntryIterator =
-          streamEntries.listIterator(currentStreamIndex);
-      while (streamEntryIterator.hasNext()) {
-        if (streamEntryIterator.next().currentPosition == 0) {
-          streamEntryIterator.remove();
-        }
-      }
-    }
-  }
-  /**
-   * It performs following actions :
-   * a. Updates the committed length at datanode for the current stream in
-   *    datanode.
-   * b. Reads the data from the underlying buffer and writes it the next 
stream.
-   *
-   * @param streamEntry StreamEntry
-   * @param streamIndex Index of the entry
-   * @throws IOException Throws IOexception if Write fails
-   */
-  private void handleCloseContainerException(ChunkOutputStreamEntry 
streamEntry,
-      int streamIndex) throws IOException {
-    long committedLength = 0;
-    ByteBuffer buffer = streamEntry.getBuffer();
-    if (buffer == null) {
-      // the buffer here will be null only when closeContainerException is
-      // hit while calling putKey during close on chunkOutputStream.
-      // Since closeContainer auto commit pending keys, no need to do
-      // anything here.
-      return;
-    }
-
-    // In case where not a single chunk of data has been written to the 
Datanode
-    // yet. This block does not yet exist on the datanode but cached on the
-    // outputStream buffer. No need to call GetCommittedBlockLength here
-    // for this block associated with the stream here.
-    if (streamEntry.currentPosition >= chunkSize
-        || streamEntry.currentPosition != buffer.position()) {
-      committedLength = getCommittedBlockLength(streamEntry);
-      // update the length of the current stream
-      streamEntry.currentPosition = committedLength;
-    }
-
-    if (buffer.position() > 0) {
-      // If the data is still cached in the underlying stream, we need to
-      // allocate new block and write this data in the datanode. The cached
-      // data in the buffer does not exceed chunkSize.
-      Preconditions.checkState(buffer.position() < chunkSize);
-      currentStreamIndex += 1;
-      // readjust the byteOffset value to the length actually been written.
-      byteOffset -= buffer.position();
-      handleWrite(buffer.array(), 0, buffer.position());
-    }
-
-    // just clean up the current stream. Since the container is already closed,
-    // it will be auto committed. No need to call close again here.
-    streamEntry.cleanup();
-    // This case will arise when while writing the first chunk itself fails.
-    // In such case, the current block associated with the stream has no data
-    // written. Remove it from the current stream list.
-    if (committedLength == 0) {
-      streamEntries.remove(streamIndex);
-      Preconditions.checkArgument(currentStreamIndex != 0);
-      currentStreamIndex -= 1;
-    }
-    // discard subsequent pre allocated blocks from the streamEntries list
-    // from the closed container
-    discardPreallocatedBlocks(streamEntry.blockID.getContainerID());
-  }
-
-  private boolean checkIfContainerIsClosed(IOException ioe) {
-    return Optional.of(ioe.getCause())
-        .filter(e -> e instanceof StorageContainerException)
-        .map(e -> (StorageContainerException) e)
-        .filter(sce -> sce.getResult() == Result.CLOSED_CONTAINER_IO)
-        .isPresent();
-  }
-
-  private long getKeyLength() {
-    return streamEntries.parallelStream().mapToLong(e -> e.currentPosition)
-        .sum();
-  }
-
-  /**
-   * Contact OM to get a new block. Set the new block with the index (e.g.
-   * first block has index = 0, second has index = 1 etc.)
-   *
-   * The returned block is made to new ChunkOutputStreamEntry to write.
-   *
-   * @param index the index of the block.
-   * @throws IOException
-   */
-  private void allocateNewBlock(int index) throws IOException {
-    OmKeyLocationInfo subKeyInfo = omClient.allocateBlock(keyArgs, openID);
-    checkKeyLocationInfo(subKeyInfo);
-  }
-
-  @Override
-  public void flush() throws IOException {
-    checkNotClosed();
-    handleFlushOrClose(false);
-  }
-
-  /**
-   * Close or Flush the latest outputStream.
-   * @param close Flag which decides whether to call close or flush on the
-   *              outputStream.
-   * @throws IOException In case, flush or close fails with exception.
-   */
-  private void handleFlushOrClose(boolean close) throws IOException {
-    if (streamEntries.size() == 0) {
-      return;
-    }
-    int size = streamEntries.size();
-    int streamIndex =
-        currentStreamIndex >= size ? size - 1 : currentStreamIndex;
-    ChunkOutputStreamEntry entry = streamEntries.get(streamIndex);
-    if (entry != null) {
-      try {
-        if (close) {
-          entry.close();
-        } else {
-          entry.flush();
-        }
-      } catch (IOException ioe) {
-        if (checkIfContainerIsClosed(ioe)) {
-          // This call will allocate a new streamEntry and write the Data.
-          // Close needs to be retried on the newly allocated streamEntry as
-          // as well.
-          handleCloseContainerException(entry, streamIndex);
-          handleFlushOrClose(close);
-        } else {
-          throw ioe;
-        }
-      }
-    }
-  }
-
-  /**
-   * Commit the key to OM, this will add the blocks as the new key blocks.
-   *
-   * @throws IOException
-   */
-  @Override
-  public void close() throws IOException {
-    if (closed) {
-      return;
-    }
-    closed = true;
-    handleFlushOrClose(true);
-    if (keyArgs != null) {
-      // in test, this could be null
-      removeEmptyBlocks();
-      Preconditions.checkState(byteOffset == getKeyLength());
-      keyArgs.setDataSize(byteOffset);
-      keyArgs.setLocationInfoList(getLocationInfoList());
-      omClient.commitKey(keyArgs, openID);
-    } else {
-      LOG.warn("Closing ChunkGroupOutputStream, but key args is null");
-    }
-  }
-
-  /**
-   * Builder class of ChunkGroupOutputStream.
-   */
-  public static class Builder {
-    private OpenKeySession openHandler;
-    private XceiverClientManager xceiverManager;
-    private StorageContainerLocationProtocolClientSideTranslatorPB scmClient;
-    private OzoneManagerProtocolClientSideTranslatorPB omClient;
-    private int chunkSize;
-    private String requestID;
-    private ReplicationType type;
-    private ReplicationFactor factor;
-    private RetryPolicy retryPolicy;
-
-    public Builder setHandler(OpenKeySession handler) {
-      this.openHandler = handler;
-      return this;
-    }
-
-    public Builder setXceiverClientManager(XceiverClientManager manager) {
-      this.xceiverManager = manager;
-      return this;
-    }
-
-    public Builder setScmClient(
-        StorageContainerLocationProtocolClientSideTranslatorPB client) {
-      this.scmClient = client;
-      return this;
-    }
-
-    public Builder setOmClient(
-        OzoneManagerProtocolClientSideTranslatorPB client) {
-      this.omClient = client;
-      return this;
-    }
-
-    public Builder setChunkSize(int size) {
-      this.chunkSize = size;
-      return this;
-    }
-
-    public Builder setRequestID(String id) {
-      this.requestID = id;
-      return this;
-    }
-
-    public Builder setType(ReplicationType replicationType) {
-      this.type = replicationType;
-      return this;
-    }
-
-    public Builder setFactor(ReplicationFactor replicationFactor) {
-      this.factor = replicationFactor;
-      return this;
-    }
-
-    public ChunkGroupOutputStream build() throws IOException {
-      return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient,
-          omClient, chunkSize, requestID, factor, type, retryPolicy);
-    }
-
-    public Builder setRetryPolicy(RetryPolicy rPolicy) {
-      this.retryPolicy = rPolicy;
-      return this;
-    }
-
-  }
-
-  private static class ChunkOutputStreamEntry extends OutputStream {
-    private OutputStream outputStream;
-    private final BlockID blockID;
-    private final String key;
-    private final XceiverClientManager xceiverClientManager;
-    private final XceiverClientSpi xceiverClient;
-    private final String requestId;
-    private final int chunkSize;
-    // total number of bytes that should be written to this stream
-    private final long length;
-    // the current position of this stream 0 <= currentPosition < length
-    private long currentPosition;
-
-    ChunkOutputStreamEntry(BlockID blockID, String key,
-        XceiverClientManager xceiverClientManager,
-        XceiverClientSpi xceiverClient, String requestId, int chunkSize,
-        long length) {
-      this.outputStream = null;
-      this.blockID = blockID;
-      this.key = key;
-      this.xceiverClientManager = xceiverClientManager;
-      this.xceiverClient = xceiverClient;
-      this.requestId = requestId;
-      this.chunkSize = chunkSize;
-
-      this.length = length;
-      this.currentPosition = 0;
-    }
-
-    /**
-     * For testing purpose, taking a some random created stream instance.
-     * @param  outputStream a existing writable output stream
-     * @param  length the length of data to write to the stream
-     */
-    ChunkOutputStreamEntry(OutputStream outputStream, long length) {
-      this.outputStream = outputStream;
-      this.blockID = null;
-      this.key = null;
-      this.xceiverClientManager = null;
-      this.xceiverClient = null;
-      this.requestId = null;
-      this.chunkSize = -1;
-
-      this.length = length;
-      this.currentPosition = 0;
-    }
-
-    long getLength() {
-      return length;
-    }
-
-    long getRemaining() {
-      return length - currentPosition;
-    }
-
-    private void checkStream() {
-      if (this.outputStream == null) {
-        this.outputStream = new ChunkOutputStream(blockID,
-            key, xceiverClientManager, xceiverClient,
-            requestId, chunkSize);
-      }
-    }
-
-    @Override
-    public void write(int b) throws IOException {
-      checkStream();
-      outputStream.write(b);
-      this.currentPosition += 1;
-    }
-
-    @Override
-    public void write(byte[] b, int off, int len) throws IOException {
-      checkStream();
-      outputStream.write(b, off, len);
-      this.currentPosition += len;
-    }
-
-    @Override
-    public void flush() throws IOException {
-      if (this.outputStream != null) {
-        this.outputStream.flush();
-      }
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (this.outputStream != null) {
-        this.outputStream.close();
-      }
-    }
-
-    ByteBuffer getBuffer() throws IOException {
-      if (this.outputStream instanceof ChunkOutputStream) {
-        ChunkOutputStream out = (ChunkOutputStream) this.outputStream;
-        return out.getBuffer();
-      }
-      throw new IOException("Invalid Output Stream for Key: " + key);
-    }
-
-    public void cleanup() {
-      checkStream();
-      if (this.outputStream instanceof ChunkOutputStream) {
-        ChunkOutputStream out = (ChunkOutputStream) this.outputStream;
-        out.cleanup();
-      }
-    }
-
-  }
-
-  /**
-   * Verify that the output stream is open. Non blocking; this gives
-   * the last state of the volatile {@link #closed} field.
-   * @throws IOException if the connection is closed.
-   */
-  private void checkNotClosed() throws IOException {
-    if (closed) {
-      throw new IOException(
-          ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + keyArgs
-              .getKeyName());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java
deleted file mode 100644
index e1f65e6..0000000
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.ozone.client.io;
-
-import org.apache.hadoop.hdds.scm.storage.ChunkInputStream;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * OzoneInputStream is used to read data from Ozone.
- * It uses SCM's {@link ChunkInputStream} for reading the data.
- */
-public class OzoneInputStream extends InputStream {
-
-  private final InputStream inputStream;
-
-  /**
-   * Constructs OzoneInputStream with ChunkInputStream.
-   *
-   * @param inputStream
-   */
-  public OzoneInputStream(InputStream inputStream) {
-    this.inputStream = inputStream;
-  }
-
-  @Override
-  public int read() throws IOException {
-    return inputStream.read();
-  }
-
-  @Override
-  public int read(byte[] b, int off, int len) throws IOException {
-    return inputStream.read(b, off, len);
-  }
-
-  @Override
-  public synchronized void close() throws IOException {
-    inputStream.close();
-  }
-
-  @Override
-  public int available() throws IOException {
-    return inputStream.available();
-  }
-
-  public InputStream getInputStream() {
-    return inputStream;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
deleted file mode 100644
index 5369220..0000000
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.ozone.client.io;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * OzoneOutputStream is used to write data into Ozone.
- * It uses SCM's {@link ChunkGroupOutputStream} for writing the data.
- */
-public class OzoneOutputStream extends OutputStream {
-
-  private final OutputStream outputStream;
-
-  /**
-   * Constructs OzoneOutputStream with ChunkGroupOutputStream.
-   *
-   * @param outputStream
-   */
-  public OzoneOutputStream(OutputStream outputStream) {
-    this.outputStream = outputStream;
-  }
-
-  @Override
-  public void write(int b) throws IOException {
-    outputStream.write(b);
-  }
-
-  @Override
-  public void write(byte[] b, int off, int len) throws IOException {
-    outputStream.write(b, off, len);
-  }
-
-  @Override
-  public synchronized void flush() throws IOException {
-    outputStream.flush();
-  }
-
-  @Override
-  public synchronized void close() throws IOException {
-    //commitKey can be done here, if needed.
-    outputStream.close();
-  }
-
-  public OutputStream getOutputStream() {
-    return outputStream;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/package-info.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/package-info.java
deleted file mode 100644
index 493ece8..0000000
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.client.io;
-
-/**
- * This package contains Ozone I/O classes.
- */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/package-info.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/package-info.java
deleted file mode 100644
index 7e2591a..0000000
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.client;
-
-/**
- * This package contains Ozone Client classes.
- */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
deleted file mode 100644
index 008b69d..0000000
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.client.protocol;
-
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.ozone.OzoneAcl;
-import org.apache.hadoop.ozone.client.*;
-import org.apache.hadoop.hdds.client.OzoneQuota;
-import org.apache.hadoop.hdds.client.ReplicationFactor;
-import org.apache.hadoop.hdds.client.ReplicationType;
-import org.apache.hadoop.ozone.client.io.OzoneInputStream;
-import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * An implementer of this interface is capable of connecting to Ozone Cluster
- * and perform client operations. The protocol used for communication is
- * determined by the implementation class specified by
- * property <code>ozone.client.protocol</code>. The build-in implementation
- * includes: {@link org.apache.hadoop.ozone.client.rpc.RpcClient} for RPC and
- * {@link  org.apache.hadoop.ozone.client.rest.RestClient} for REST.
- */
-public interface ClientProtocol {
-
-  /**
-   * Creates a new Volume.
-   * @param volumeName Name of the Volume
-   * @throws IOException
-   */
-  void createVolume(String volumeName)
-      throws IOException;
-
-  /**
-   * Creates a new Volume with properties set in VolumeArgs.
-   * @param volumeName Name of the Volume
-   * @param args Properties to be set for the Volume
-   * @throws IOException
-   */
-  void createVolume(String volumeName, VolumeArgs args)
-      throws IOException;
-
-  /**
-   * Sets the owner of volume.
-   * @param volumeName Name of the Volume
-   * @param owner to be set for the Volume
-   * @throws IOException
-   */
-  void setVolumeOwner(String volumeName, String owner) throws IOException;
-
-  /**
-   * Set Volume Quota.
-   * @param volumeName Name of the Volume
-   * @param quota Quota to be set for the Volume
-   * @throws IOException
-   */
-  void setVolumeQuota(String volumeName, OzoneQuota quota)
-      throws IOException;
-
-  /**
-   * Returns {@link OzoneVolume}.
-   * @param volumeName Name of the Volume
-   * @return {@link OzoneVolume}
-   * @throws IOException
-   * */
-  OzoneVolume getVolumeDetails(String volumeName)
-      throws IOException;
-
-  /**
-   * Checks if a Volume exists and the user with a role specified has access
-   * to the Volume.
-   * @param volumeName Name of the Volume
-   * @param acl requested acls which needs to be checked for access
-   * @return Boolean - True if the user with a role can access the volume.
-   * This is possible for owners of the volume and admin users
-   * @throws IOException
-   */
-  boolean checkVolumeAccess(String volumeName, OzoneAcl acl)
-      throws IOException;
-
-  /**
-   * Deletes an empty Volume.
-   * @param volumeName Name of the Volume
-   * @throws IOException
-   */
-  void deleteVolume(String volumeName) throws IOException;
-
-  /**
-   * Lists all volumes in the cluster that matches the volumePrefix,
-   * size of the returned list depends on maxListResult. If volume prefix
-   * is null, returns all the volumes. The caller has to make multiple calls
-   * to read all volumes.
-   *
-   * @param volumePrefix Volume prefix to match
-   * @param prevVolume Starting point of the list, this volume is excluded
-   * @param maxListResult Max number of volumes to return.
-   * @return {@code List<OzoneVolume>}
-   * @throws IOException
-   */
-  List<OzoneVolume> listVolumes(String volumePrefix, String prevVolume,
-                                int maxListResult)
-      throws IOException;
-
-  /**
-   * Lists all volumes in the cluster that are owned by the specified
-   * user and matches the volumePrefix, size of the returned list depends on
-   * maxListResult. If the user is null, return volumes owned by current user.
-   * If volume prefix is null, returns all the volumes. The caller has to make
-   * multiple calls to read all volumes.
-   *
-   * @param user User Name
-   * @param volumePrefix Volume prefix to match
-   * @param prevVolume Starting point of the list, this volume is excluded
-   * @param maxListResult Max number of volumes to return.
-   * @return {@code List<OzoneVolume>}
-   * @throws IOException
-   */
-  List<OzoneVolume> listVolumes(String user, String volumePrefix,
-                                    String prevVolume, int maxListResult)
-      throws IOException;
-
-  /**
-   * Creates a new Bucket in the Volume.
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   * @throws IOException
-   */
-  void createBucket(String volumeName, String bucketName)
-      throws IOException;
-
-  /**
-   * Creates a new Bucket in the Volume, with properties set in BucketArgs.
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   * @param bucketArgs Bucket Arguments
-   * @throws IOException
-   */
-  void createBucket(String volumeName, String bucketName,
-                    BucketArgs bucketArgs)
-      throws IOException;
-
-  /**
-   * Adds ACLs to the Bucket.
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   * @param addAcls ACLs to be added
-   * @throws IOException
-   */
-  void addBucketAcls(String volumeName, String bucketName,
-                     List<OzoneAcl> addAcls)
-      throws IOException;
-
-  /**
-   * Removes ACLs from a Bucket.
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   * @param removeAcls ACLs to be removed
-   * @throws IOException
-   */
-  void removeBucketAcls(String volumeName, String bucketName,
-                        List<OzoneAcl> removeAcls)
-      throws IOException;
-
-
-  /**
-   * Enables or disables Bucket Versioning.
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   * @param versioning True to enable Versioning, False to disable.
-   * @throws IOException
-   */
-  void setBucketVersioning(String volumeName, String bucketName,
-                           Boolean versioning)
-      throws IOException;
-
-  /**
-   * Sets the Storage Class of a Bucket.
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   * @param storageType StorageType to be set
-   * @throws IOException
-   */
-  void setBucketStorageType(String volumeName, String bucketName,
-                            StorageType storageType)
-      throws IOException;
-
-  /**
-   * Deletes a bucket if it is empty.
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   * @throws IOException
-   */
-  void deleteBucket(String volumeName, String bucketName)
-      throws IOException;
-
-  /**
-   * True if the bucket exists and user has read access
-   * to the bucket else throws Exception.
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   * @throws IOException
-   */
-  void checkBucketAccess(String volumeName, String bucketName)
-      throws IOException;
-
-  /**
-   * Returns {@link OzoneBucket}.
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   * @return {@link OzoneBucket}
-   * @throws IOException
-   */
-  OzoneBucket getBucketDetails(String volumeName, String bucketName)
-      throws IOException;
-
-  /**
-   * Returns the List of Buckets in the Volume that matches the bucketPrefix,
-   * size of the returned list depends on maxListResult. The caller has to make
-   * multiple calls to read all volumes.
-   * @param volumeName Name of the Volume
-   * @param bucketPrefix Bucket prefix to match
-   * @param prevBucket Starting point of the list, this bucket is excluded
-   * @param maxListResult Max number of buckets to return.
-   * @return {@code List<OzoneBucket>}
-   * @throws IOException
-   */
-  List<OzoneBucket> listBuckets(String volumeName, String bucketPrefix,
-                                String prevBucket, int maxListResult)
-      throws IOException;
-
-  /**
-   * Writes a key in an existing bucket.
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   * @param keyName Name of the Key
-   * @param size Size of the data
-   * @return {@link OzoneOutputStream}
-   *
-   */
-  OzoneOutputStream createKey(String volumeName, String bucketName,
-                              String keyName, long size, ReplicationType type,
-                              ReplicationFactor factor)
-      throws IOException;
-
-  /**
-   * Reads a key from an existing bucket.
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   * @param keyName Name of the Key
-   * @return {@link OzoneInputStream}
-   * @throws IOException
-   */
-  OzoneInputStream getKey(String volumeName, String bucketName, String keyName)
-      throws IOException;
-
-
-  /**
-   * Deletes an existing key.
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   * @param keyName Name of the Key
-   * @throws IOException
-   */
-  void deleteKey(String volumeName, String bucketName, String keyName)
-      throws IOException;
-
-  /**
-   * Renames an existing key within a bucket.
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   * @param fromKeyName Name of the Key to be renamed
-   * @param toKeyName New name to be used for the Key
-   * @throws IOException
-   */
-  void renameKey(String volumeName, String bucketName, String fromKeyName,
-      String toKeyName) throws IOException;
-
-  /**
-   * Returns list of Keys in {Volume/Bucket} that matches the keyPrefix,
-   * size of the returned list depends on maxListResult. The caller has
-   * to make multiple calls to read all keys.
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   * @param keyPrefix Bucket prefix to match
-   * @param prevKey Starting point of the list, this key is excluded
-   * @param maxListResult Max number of buckets to return.
-   * @return {@code List<OzoneKey>}
-   * @throws IOException
-   */
-  List<OzoneKey> listKeys(String volumeName, String bucketName,
-                          String keyPrefix, String prevKey, int maxListResult)
-      throws IOException;
-
-
-  /**
-   * Get OzoneKey.
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   * @param keyName Key name
-   * @return {@link OzoneKey}
-   * @throws IOException
-   */
-  OzoneKeyDetails getKeyDetails(String volumeName, String bucketName,
-                                String keyName)
-      throws IOException;
-
-  /**
-   * Close and release the resources.
-   */
-  void close() throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/package-info.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/package-info.java
deleted file mode 100644
index f4890a1..0000000
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.client.protocol;
-
-/**
- * This package contains Ozone client protocol library classes.
- */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/DefaultRestServerSelector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/DefaultRestServerSelector.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/DefaultRestServerSelector.java
deleted file mode 100644
index abdc2fb..0000000
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/DefaultRestServerSelector.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.client.rest;
-
-import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
-
-import java.util.List;
-import java.util.Random;
-
-/**
- * Default selector randomly picks one of the REST Server from the list.
- */
-public class DefaultRestServerSelector implements RestServerSelector {
-
-  @Override
-  public ServiceInfo getRestServer(List<ServiceInfo> restServices) {
-    return restServices.get(
-        new Random().nextInt(restServices.size()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/OzoneExceptionMapper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/OzoneExceptionMapper.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/OzoneExceptionMapper.java
deleted file mode 100644
index 6c479f7..0000000
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/OzoneExceptionMapper.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.client.rest;
-
-
-import javax.ws.rs.core.Response;
-import javax.ws.rs.ext.ExceptionMapper;
-
-import org.slf4j.MDC;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *  Class the represents various errors returned by the
- *  Object Layer.
- */
-public class OzoneExceptionMapper implements ExceptionMapper<OzoneException> {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(OzoneExceptionMapper.class);
-
-  @Override
-  public Response toResponse(OzoneException exception) {
-    LOG.debug("Returning exception. ex: {}", exception.toJsonString());
-    MDC.clear();
-    return Response.status((int)exception.getHttpCode())
-      .entity(exception.toJsonString()).build();
-  }
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to