http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java deleted file mode 100644 index bf301b8..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ /dev/null @@ -1,304 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.scm.protocolPB; - -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.ipc.ProtobufHelper; -import org.apache.hadoop.ipc.ProtocolTranslator; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; -import org.apache.hadoop.scm.ScmInfo; -import org.apache.hadoop.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerResponseProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.DeleteContainerRequestProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerRequestProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerResponseProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryRequestProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryResponseProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.List; - -/** - * This class is the client-side translator to translate the requests made on - * the {@link StorageContainerLocationProtocol} interface to the RPC server - * implementing {@link StorageContainerLocationProtocolPB}. - */ -@InterfaceAudience.Private -public final class StorageContainerLocationProtocolClientSideTranslatorPB - implements StorageContainerLocationProtocol, ProtocolTranslator, Closeable { - - /** - * RpcController is not used and hence is set to null. - */ - private static final RpcController NULL_RPC_CONTROLLER = null; - - private final StorageContainerLocationProtocolPB rpcProxy; - - /** - * Creates a new StorageContainerLocationProtocolClientSideTranslatorPB. - * - * @param rpcProxy {@link StorageContainerLocationProtocolPB} RPC proxy - */ - public StorageContainerLocationProtocolClientSideTranslatorPB( - StorageContainerLocationProtocolPB rpcProxy) { - this.rpcProxy = rpcProxy; - } - - /** - * Asks SCM where a container should be allocated. SCM responds with the set - * of datanodes that should be used creating this container. Ozone/SCM only - * supports replication factor of either 1 or 3. - * @param type - Replication Type - * @param factor - Replication Count - * @param containerName - Name - * @return - * @throws IOException - */ - @Override - public Pipeline allocateContainer(OzoneProtos.ReplicationType type, - OzoneProtos.ReplicationFactor factor, String - containerName, String owner) throws IOException { - - Preconditions.checkNotNull(containerName, "Container Name cannot be Null"); - Preconditions.checkState(!containerName.isEmpty(), "Container name cannot" + - " be empty"); - ContainerRequestProto request = ContainerRequestProto.newBuilder() - .setContainerName(containerName) - .setReplicationFactor(factor) - .setReplicationType(type) - .setOwner(owner) - .build(); - - final ContainerResponseProto response; - try { - response = rpcProxy.allocateContainer(NULL_RPC_CONTROLLER, request); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - if (response.getErrorCode() != ContainerResponseProto.Error.success) { - throw new IOException(response.hasErrorMessage() ? - response.getErrorMessage() : "Allocate container failed."); - } - return Pipeline.getFromProtoBuf(response.getPipeline()); - } - - public Pipeline getContainer(String containerName) throws IOException { - Preconditions.checkNotNull(containerName, - "Container Name cannot be Null"); - Preconditions.checkState(!containerName.isEmpty(), - "Container name cannot be empty"); - GetContainerRequestProto request = GetContainerRequestProto - .newBuilder() - .setContainerName(containerName) - .build(); - try { - GetContainerResponseProto response = - rpcProxy.getContainer(NULL_RPC_CONTROLLER, request); - return Pipeline.getFromProtoBuf(response.getPipeline()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - } - - /** - * {@inheritDoc} - */ - @Override - public List<ContainerInfo> listContainer(String startName, String prefixName, - int count) throws IOException { - ListContainerRequestProto.Builder builder = ListContainerRequestProto - .newBuilder(); - if (prefixName != null) { - builder.setPrefixName(prefixName); - } - if (startName != null) { - builder.setStartName(startName); - } - builder.setCount(count); - ListContainerRequestProto request = builder.build(); - - try { - ListContainerResponseProto response = - rpcProxy.listContainer(NULL_RPC_CONTROLLER, request); - List<ContainerInfo> containerList = new ArrayList<>(); - for (OzoneProtos.SCMContainerInfo containerInfoProto : response - .getContainersList()) { - containerList.add(ContainerInfo.fromProtobuf(containerInfoProto)); - } - return containerList; - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - } - - /** - * Ask SCM to delete a container by name. SCM will remove - * the container mapping in its database. - * - * @param containerName - * @throws IOException - */ - @Override - public void deleteContainer(String containerName) - throws IOException { - Preconditions.checkState(!Strings.isNullOrEmpty(containerName), - "Container name cannot be null or empty"); - DeleteContainerRequestProto request = DeleteContainerRequestProto - .newBuilder() - .setContainerName(containerName) - .build(); - try { - rpcProxy.deleteContainer(NULL_RPC_CONTROLLER, request); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - } - - /** - * Queries a list of Node Statuses. - * - * @param nodeStatuses - * @return List of Datanodes. - */ - @Override - public OzoneProtos.NodePool queryNode(EnumSet<OzoneProtos.NodeState> - nodeStatuses, OzoneProtos.QueryScope queryScope, String poolName) - throws IOException { - // TODO : We support only cluster wide query right now. So ignoring checking - // queryScope and poolName - Preconditions.checkNotNull(nodeStatuses); - Preconditions.checkState(nodeStatuses.size() > 0); - NodeQueryRequestProto request = NodeQueryRequestProto.newBuilder() - .addAllQuery(nodeStatuses) - .setScope(queryScope).setPoolName(poolName).build(); - try { - NodeQueryResponseProto response = - rpcProxy.queryNode(NULL_RPC_CONTROLLER, request); - return response.getDatanodes(); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - - } - - /** - * Notify from client that creates object on datanodes. - * @param type object type - * @param name object name - * @param op operation type (e.g., create, close, delete) - * @param stage object creation stage : begin/complete - */ - @Override - public void notifyObjectStageChange( - ObjectStageChangeRequestProto.Type type, String name, - ObjectStageChangeRequestProto.Op op, - ObjectStageChangeRequestProto.Stage stage) throws IOException { - Preconditions.checkState(!Strings.isNullOrEmpty(name), - "Object name cannot be null or empty"); - ObjectStageChangeRequestProto request = - ObjectStageChangeRequestProto.newBuilder() - .setType(type) - .setName(name) - .setOp(op) - .setStage(stage) - .build(); - try { - rpcProxy.notifyObjectStageChange(NULL_RPC_CONTROLLER, request); - } catch(ServiceException e){ - throw ProtobufHelper.getRemoteException(e); - } - } - - /** - * Creates a replication pipeline of a specified type. - * - * @param replicationType - replication type - * @param factor - factor 1 or 3 - * @param nodePool - optional machine list to build a pipeline. - * @throws IOException - */ - @Override - public Pipeline createReplicationPipeline(OzoneProtos.ReplicationType - replicationType, OzoneProtos.ReplicationFactor factor, OzoneProtos - .NodePool nodePool) throws IOException { - PipelineRequestProto request = PipelineRequestProto.newBuilder() - .setNodePool(nodePool) - .setReplicationFactor(factor) - .setReplicationType(replicationType) - .build(); - try { - PipelineResponseProto response = - rpcProxy.allocatePipeline(NULL_RPC_CONTROLLER, request); - if (response.getErrorCode() == - PipelineResponseProto.Error.success) { - Preconditions.checkState(response.hasPipeline(), "With success, " + - "must come a pipeline"); - return Pipeline.getFromProtoBuf(response.getPipeline()); - } else { - String errorMessage = String.format("create replication pipeline " + - "failed. code : %s Message: %s", response.getErrorCode(), - response.hasErrorMessage() ? response.getErrorMessage() : ""); - throw new IOException(errorMessage); - } - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - } - - @Override - public ScmInfo getScmInfo() throws IOException { - OzoneProtos.GetScmInfoRequestProto request = - OzoneProtos.GetScmInfoRequestProto.getDefaultInstance(); - try { - OzoneProtos.GetScmInfoRespsonseProto resp = rpcProxy.getScmInfo( - NULL_RPC_CONTROLLER, request); - ScmInfo.Builder builder = new ScmInfo.Builder() - .setClusterId(resp.getClusterId()) - .setScmId(resp.getScmId()); - return builder.build(); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - - } - - @Override - public Object getUnderlyingProxyObject() { - return rpcProxy; - } - - @Override - public void close() { - RPC.stopProxy(rpcProxy); - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolPB.java deleted file mode 100644 index 9ee1307..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolPB.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.scm.protocolPB; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.ipc.ProtocolInfo; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.StorageContainerLocationProtocolService; - -/** - * Protocol used from an HDFS node to StorageContainerManager. This extends the - * Protocol Buffers service interface to add Hadoop-specific annotations. - */ -@ProtocolInfo(protocolName = - "org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol", - protocolVersion = 1) -@InterfaceAudience.Private -public interface StorageContainerLocationProtocolPB - extends StorageContainerLocationProtocolService.BlockingInterface { -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/package-info.java deleted file mode 100644 index f9a2c09..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/package-info.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.scm.protocolPB; - -/** - * This package contains classes for the client of the storage container - * protocol. - */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java deleted file mode 100644 index 943be2a..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.scm.storage; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.List; - -import com.google.protobuf.ByteString; - -import org.apache.hadoop.fs.Seekable; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; -import org.apache.hadoop.scm.XceiverClientSpi; -import org.apache.hadoop.scm.XceiverClientManager; - -/** - * An {@link InputStream} used by the REST service in combination with the - * SCMClient to read the value of a key from a sequence - * of container chunks. All bytes of the key value are stored in container - * chunks. Each chunk may contain multiple underlying {@link ByteBuffer} - * instances. This class encapsulates all state management for iterating - * through the sequence of chunks and the sequence of buffers within each chunk. - */ -public class ChunkInputStream extends InputStream implements Seekable { - - private static final int EOF = -1; - - private final String key; - private final String traceID; - private XceiverClientManager xceiverClientManager; - private XceiverClientSpi xceiverClient; - private List<ChunkInfo> chunks; - private int chunkIndex; - private long[] chunkOffset; - private List<ByteBuffer> buffers; - private int bufferIndex; - - /** - * Creates a new ChunkInputStream. - * - * @param key chunk key - * @param xceiverClientManager client manager that controls client - * @param xceiverClient client to perform container calls - * @param chunks list of chunks to read - * @param traceID container protocol call traceID - */ - public ChunkInputStream(String key, XceiverClientManager xceiverClientManager, - XceiverClientSpi xceiverClient, List<ChunkInfo> chunks, String traceID) { - this.key = key; - this.traceID = traceID; - this.xceiverClientManager = xceiverClientManager; - this.xceiverClient = xceiverClient; - this.chunks = chunks; - this.chunkIndex = -1; - // chunkOffset[i] stores offset at which chunk i stores data in - // ChunkInputStream - this.chunkOffset = new long[this.chunks.size()]; - initializeChunkOffset(); - this.buffers = null; - this.bufferIndex = 0; - } - - private void initializeChunkOffset() { - int tempOffset = 0; - for (int i = 0; i < chunks.size(); i++) { - chunkOffset[i] = tempOffset; - tempOffset += chunks.get(i).getLen(); - } - } - - @Override - public synchronized int read() - throws IOException { - checkOpen(); - int available = prepareRead(1); - return available == EOF ? EOF : - Byte.toUnsignedInt(buffers.get(bufferIndex).get()); - } - - @Override - public synchronized int read(byte[] b, int off, int len) throws IOException { - // According to the JavaDocs for InputStream, it is recommended that - // subclasses provide an override of bulk read if possible for performance - // reasons. In addition to performance, we need to do it for correctness - // reasons. The Ozone REST service uses PipedInputStream and - // PipedOutputStream to relay HTTP response data between a Jersey thread and - // a Netty thread. It turns out that PipedInputStream/PipedOutputStream - // have a subtle dependency (bug?) on the wrapped stream providing separate - // implementations of single-byte read and bulk read. Without this, get key - // responses might close the connection before writing all of the bytes - // advertised in the Content-Length. - if (b == null) { - throw new NullPointerException(); - } - if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } - if (len == 0) { - return 0; - } - checkOpen(); - int available = prepareRead(len); - if (available == EOF) { - return EOF; - } - buffers.get(bufferIndex).get(b, off, available); - return available; - } - - @Override - public synchronized void close() { - if (xceiverClientManager != null && xceiverClient != null) { - xceiverClientManager.releaseClient(xceiverClient); - xceiverClientManager = null; - xceiverClient = null; - } - } - - /** - * Checks if the stream is open. If not, throws an exception. - * - * @throws IOException if stream is closed - */ - private synchronized void checkOpen() throws IOException { - if (xceiverClient == null) { - throw new IOException("ChunkInputStream has been closed."); - } - } - - /** - * Prepares to read by advancing through chunks and buffers as needed until it - * finds data to return or encounters EOF. - * - * @param len desired length of data to read - * @return length of data available to read, possibly less than desired length - */ - private synchronized int prepareRead(int len) throws IOException { - for (;;) { - if (chunks == null || chunks.isEmpty()) { - // This must be an empty key. - return EOF; - } else if (buffers == null) { - // The first read triggers fetching the first chunk. - readChunkFromContainer(); - } else if (!buffers.isEmpty() && - buffers.get(bufferIndex).hasRemaining()) { - // Data is available from the current buffer. - ByteBuffer bb = buffers.get(bufferIndex); - return len > bb.remaining() ? bb.remaining() : len; - } else if (!buffers.isEmpty() && - !buffers.get(bufferIndex).hasRemaining() && - bufferIndex < buffers.size() - 1) { - // There are additional buffers available. - ++bufferIndex; - } else if (chunkIndex < chunks.size() - 1) { - // There are additional chunks available. - readChunkFromContainer(); - } else { - // All available input has been consumed. - return EOF; - } - } - } - - /** - * Attempts to read the chunk at the specified offset in the chunk list. If - * successful, then the data of the read chunk is saved so that its bytes can - * be returned from subsequent read calls. - * - * @throws IOException if there is an I/O error while performing the call - */ - private synchronized void readChunkFromContainer() throws IOException { - // On every chunk read chunkIndex should be increased so as to read the - // next chunk - chunkIndex += 1; - final ReadChunkResponseProto readChunkResponse; - try { - readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient, - chunks.get(chunkIndex), key, traceID); - } catch (IOException e) { - throw new IOException("Unexpected OzoneException: " + e.toString(), e); - } - ByteString byteString = readChunkResponse.getData(); - buffers = byteString.asReadOnlyByteBufferList(); - bufferIndex = 0; - } - - @Override - public synchronized void seek(long pos) throws IOException { - if (pos < 0 || (chunks.size() == 0 && pos > 0) - || pos >= chunkOffset[chunks.size() - 1] + chunks.get(chunks.size() - 1) - .getLen()) { - throw new EOFException( - "EOF encountered pos: " + pos + " container key: " + key); - } - if (chunkIndex == -1) { - chunkIndex = Arrays.binarySearch(chunkOffset, pos); - } else if (pos < chunkOffset[chunkIndex]) { - chunkIndex = Arrays.binarySearch(chunkOffset, 0, chunkIndex, pos); - } else if (pos >= chunkOffset[chunkIndex] + chunks.get(chunkIndex) - .getLen()) { - chunkIndex = - Arrays.binarySearch(chunkOffset, chunkIndex + 1, chunks.size(), pos); - } - if (chunkIndex < 0) { - // Binary search returns -insertionPoint - 1 if element is not present - // in the array. insertionPoint is the point at which element would be - // inserted in the sorted array. We need to adjust the chunkIndex - // accordingly so that chunkIndex = insertionPoint - 1 - chunkIndex = -chunkIndex -2; - } - // adjust chunkIndex so that readChunkFromContainer reads the correct chunk - chunkIndex -= 1; - readChunkFromContainer(); - adjustBufferIndex(pos); - } - - private void adjustBufferIndex(long pos) { - long tempOffest = chunkOffset[chunkIndex]; - for (int i = 0; i < buffers.size(); i++) { - if (pos - tempOffest >= buffers.get(i).capacity()) { - tempOffest += buffers.get(i).capacity(); - } else { - bufferIndex = i; - break; - } - } - buffers.get(bufferIndex).position((int) (pos - tempOffest)); - } - - @Override - public synchronized long getPos() throws IOException { - return chunkIndex == -1 ? 0 : - chunkOffset[chunkIndex] + buffers.get(bufferIndex).position(); - } - - @Override - public boolean seekToNewSource(long targetPos) throws IOException { - return false; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java deleted file mode 100644 index 64c10da..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.scm.storage; - -import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey; -import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.writeChunk; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.UUID; - -import com.google.protobuf.ByteString; - -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue; -import org.apache.hadoop.scm.XceiverClientManager; -import org.apache.hadoop.scm.XceiverClientSpi; - -/** - * An {@link OutputStream} used by the REST service in combination with the - * SCMClient to write the value of a key to a sequence - * of container chunks. Writes are buffered locally and periodically written to - * the container as a new chunk. In order to preserve the semantics that - * replacement of a pre-existing key is atomic, each instance of the stream has - * an internal unique identifier. This unique identifier and a monotonically - * increasing chunk index form a composite key that is used as the chunk name. - * After all data is written, a putKey call creates or updates the corresponding - * container key, and this call includes the full list of chunks that make up - * the key data. The list of chunks is updated all at once. Therefore, a - * concurrent reader never can see an intermediate state in which different - * chunks of data from different versions of the key data are interleaved. - * This class encapsulates all state management for buffering and writing - * through to the container. - */ -public class ChunkOutputStream extends OutputStream { - - private final String containerKey; - private final String key; - private final String traceID; - private final KeyData.Builder containerKeyData; - private XceiverClientManager xceiverClientManager; - private XceiverClientSpi xceiverClient; - private ByteBuffer buffer; - private final String streamId; - private int chunkIndex; - private int chunkSize; - - /** - * Creates a new ChunkOutputStream. - * - * @param containerKey container key - * @param key chunk key - * @param xceiverClientManager client manager that controls client - * @param xceiverClient client to perform container calls - * @param traceID container protocol call args - * @param chunkSize chunk size - */ - public ChunkOutputStream(String containerKey, String key, - XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, - String traceID, int chunkSize) { - this.containerKey = containerKey; - this.key = key; - this.traceID = traceID; - this.chunkSize = chunkSize; - KeyValue keyValue = KeyValue.newBuilder() - .setKey("TYPE").setValue("KEY").build(); - this.containerKeyData = KeyData.newBuilder() - .setContainerName(xceiverClient.getPipeline().getContainerName()) - .setName(containerKey) - .addMetadata(keyValue); - this.xceiverClientManager = xceiverClientManager; - this.xceiverClient = xceiverClient; - this.buffer = ByteBuffer.allocate(chunkSize); - this.streamId = UUID.randomUUID().toString(); - this.chunkIndex = 0; - } - - @Override - public synchronized void write(int b) throws IOException { - checkOpen(); - int rollbackPosition = buffer.position(); - int rollbackLimit = buffer.limit(); - buffer.put((byte)b); - if (buffer.position() == chunkSize) { - flushBufferToChunk(rollbackPosition, rollbackLimit); - } - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - if (b == null) { - throw new NullPointerException(); - } - if ((off < 0) || (off > b.length) || (len < 0) || - ((off + len) > b.length) || ((off + len) < 0)) { - throw new IndexOutOfBoundsException(); - } - if (len == 0) { - return; - } - checkOpen(); - while (len > 0) { - int writeLen = Math.min(chunkSize - buffer.position(), len); - int rollbackPosition = buffer.position(); - int rollbackLimit = buffer.limit(); - buffer.put(b, off, writeLen); - if (buffer.position() == chunkSize) { - flushBufferToChunk(rollbackPosition, rollbackLimit); - } - off += writeLen; - len -= writeLen; - } - } - - @Override - public synchronized void flush() throws IOException { - checkOpen(); - if (buffer.position() > 0) { - int rollbackPosition = buffer.position(); - int rollbackLimit = buffer.limit(); - flushBufferToChunk(rollbackPosition, rollbackLimit); - } - } - - @Override - public synchronized void close() throws IOException { - if (xceiverClientManager != null && xceiverClient != null && - buffer != null) { - try { - if (buffer.position() > 0) { - writeChunkToContainer(); - } - putKey(xceiverClient, containerKeyData.build(), traceID); - } catch (IOException e) { - throw new IOException( - "Unexpected Storage Container Exception: " + e.toString(), e); - } finally { - xceiverClientManager.releaseClient(xceiverClient); - xceiverClientManager = null; - xceiverClient = null; - buffer = null; - } - } - - } - - /** - * Checks if the stream is open. If not, throws an exception. - * - * @throws IOException if stream is closed - */ - private synchronized void checkOpen() throws IOException { - if (xceiverClient == null) { - throw new IOException("ChunkOutputStream has been closed."); - } - } - - /** - * Attempts to flush buffered writes by writing a new chunk to the container. - * If successful, then clears the buffer to prepare to receive writes for a - * new chunk. - * - * @param rollbackPosition position to restore in buffer if write fails - * @param rollbackLimit limit to restore in buffer if write fails - * @throws IOException if there is an I/O error while performing the call - */ - private synchronized void flushBufferToChunk(int rollbackPosition, - int rollbackLimit) throws IOException { - boolean success = false; - try { - writeChunkToContainer(); - success = true; - } finally { - if (success) { - buffer.clear(); - } else { - buffer.position(rollbackPosition); - buffer.limit(rollbackLimit); - } - } - } - - /** - * Writes buffered data as a new chunk to the container and saves chunk - * information to be used later in putKey call. - * - * @throws IOException if there is an I/O error while performing the call - */ - private synchronized void writeChunkToContainer() throws IOException { - buffer.flip(); - ByteString data = ByteString.copyFrom(buffer); - ChunkInfo chunk = ChunkInfo - .newBuilder() - .setChunkName( - DigestUtils.md5Hex(key) + "_stream_" - + streamId + "_chunk_" + ++chunkIndex) - .setOffset(0) - .setLen(data.size()) - .build(); - try { - writeChunk(xceiverClient, chunk, key, data, traceID); - } catch (IOException e) { - throw new IOException( - "Unexpected Storage Container Exception: " + e.toString(), e); - } - containerKeyData.addChunks(chunk); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java deleted file mode 100644 index 1cde67c..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java +++ /dev/null @@ -1,397 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.scm.storage; - -import com.google.protobuf.ByteString; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .ContainerCommandRequestProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .ContainerCommandResponseProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .GetKeyRequestProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .GetKeyResponseProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .GetSmallFileRequestProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .GetSmallFileResponseProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .PutKeyRequestProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .PutSmallFileRequestProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .ReadChunkRequestProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .ReadChunkResponseProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .WriteChunkRequestProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .ReadContainerResponseProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .ReadContainerRequestProto; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue; -import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; - -import java.io.IOException; -import org.apache.hadoop.scm.XceiverClientSpi; - -/** - * Implementation of all container protocol calls performed by Container - * clients. - */ -public final class ContainerProtocolCalls { - - /** - * There is no need to instantiate this class. - */ - private ContainerProtocolCalls() { - } - - /** - * Calls the container protocol to get a container key. - * - * @param xceiverClient client to perform call - * @param containerKeyData key data to identify container - * @param traceID container protocol call args - * @return container protocol get key response - * @throws IOException if there is an I/O error while performing the call - */ - public static GetKeyResponseProto getKey(XceiverClientSpi xceiverClient, - KeyData containerKeyData, String traceID) throws IOException { - GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto - .newBuilder() - .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) - .setKeyData(containerKeyData); - String id = xceiverClient.getPipeline().getLeader().getDatanodeUuid(); - ContainerCommandRequestProto request = ContainerCommandRequestProto - .newBuilder() - .setCmdType(Type.GetKey) - .setTraceID(traceID) - .setDatanodeID(id) - .setGetKey(readKeyRequest) - .build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response); - return response.getGetKey(); - } - - /** - * Calls the container protocol to put a container key. - * - * @param xceiverClient client to perform call - * @param containerKeyData key data to identify container - * @param traceID container protocol call args - * @throws IOException if there is an I/O error while performing the call - */ - public static void putKey(XceiverClientSpi xceiverClient, - KeyData containerKeyData, String traceID) throws IOException { - PutKeyRequestProto.Builder createKeyRequest = PutKeyRequestProto - .newBuilder() - .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) - .setKeyData(containerKeyData); - String id = xceiverClient.getPipeline().getLeader().getDatanodeUuid(); - ContainerCommandRequestProto request = ContainerCommandRequestProto - .newBuilder() - .setCmdType(Type.PutKey) - .setTraceID(traceID) - .setDatanodeID(id) - .setPutKey(createKeyRequest) - .build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response); - } - - /** - * Calls the container protocol to read a chunk. - * - * @param xceiverClient client to perform call - * @param chunk information about chunk to read - * @param key the key name - * @param traceID container protocol call args - * @return container protocol read chunk response - * @throws IOException if there is an I/O error while performing the call - */ - public static ReadChunkResponseProto readChunk(XceiverClientSpi xceiverClient, - ChunkInfo chunk, String key, String traceID) - throws IOException { - ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto - .newBuilder() - .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) - .setKeyName(key) - .setChunkData(chunk); - String id = xceiverClient.getPipeline().getLeader().getDatanodeUuid(); - ContainerCommandRequestProto request = ContainerCommandRequestProto - .newBuilder() - .setCmdType(Type.ReadChunk) - .setTraceID(traceID) - .setDatanodeID(id) - .setReadChunk(readChunkRequest) - .build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response); - return response.getReadChunk(); - } - - /** - * Calls the container protocol to write a chunk. - * - * @param xceiverClient client to perform call - * @param chunk information about chunk to write - * @param key the key name - * @param data the data of the chunk to write - * @param traceID container protocol call args - * @throws IOException if there is an I/O error while performing the call - */ - public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk, - String key, ByteString data, String traceID) - throws IOException { - WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto - .newBuilder() - .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) - .setKeyName(key) - .setChunkData(chunk) - .setData(data); - String id = xceiverClient.getPipeline().getLeader().getDatanodeUuid(); - ContainerCommandRequestProto request = ContainerCommandRequestProto - .newBuilder() - .setCmdType(Type.WriteChunk) - .setTraceID(traceID) - .setDatanodeID(id) - .setWriteChunk(writeChunkRequest) - .build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response); - } - - /** - * Allows writing a small file using single RPC. This takes the container - * name, key name and data to write sends all that data to the container using - * a single RPC. This API is designed to be used for files which are smaller - * than 1 MB. - * - * @param client - client that communicates with the container. - * @param containerName - Name of the container - * @param key - Name of the Key - * @param data - Data to be written into the container. - * @param traceID - Trace ID for logging purpose. - * @throws IOException - */ - public static void writeSmallFile(XceiverClientSpi client, - String containerName, String key, byte[] data, String traceID) - throws IOException { - - KeyData containerKeyData = - KeyData.newBuilder().setContainerName(containerName).setName(key) - .build(); - PutKeyRequestProto.Builder createKeyRequest = - PutKeyRequestProto.newBuilder() - .setPipeline(client.getPipeline().getProtobufMessage()) - .setKeyData(containerKeyData); - - KeyValue keyValue = - KeyValue.newBuilder().setKey("OverWriteRequested").setValue("true") - .build(); - ChunkInfo chunk = - ChunkInfo.newBuilder().setChunkName(key + "_chunk").setOffset(0) - .setLen(data.length).addMetadata(keyValue).build(); - - PutSmallFileRequestProto putSmallFileRequest = - PutSmallFileRequestProto.newBuilder().setChunkInfo(chunk) - .setKey(createKeyRequest).setData(ByteString.copyFrom(data)) - .build(); - - String id = client.getPipeline().getLeader().getDatanodeUuid(); - ContainerCommandRequestProto request = - ContainerCommandRequestProto.newBuilder() - .setCmdType(Type.PutSmallFile) - .setTraceID(traceID) - .setDatanodeID(id) - .setPutSmallFile(putSmallFileRequest) - .build(); - ContainerCommandResponseProto response = client.sendCommand(request); - validateContainerResponse(response); - } - - /** - * createContainer call that creates a container on the datanode. - * @param client - client - * @param traceID - traceID - * @throws IOException - */ - public static void createContainer(XceiverClientSpi client, String traceID) - throws IOException { - ContainerProtos.CreateContainerRequestProto.Builder createRequest = - ContainerProtos.CreateContainerRequestProto - .newBuilder(); - ContainerProtos.ContainerData.Builder containerData = ContainerProtos - .ContainerData.newBuilder(); - containerData.setName(client.getPipeline().getContainerName()); - createRequest.setPipeline(client.getPipeline().getProtobufMessage()); - createRequest.setContainerData(containerData.build()); - - String id = client.getPipeline().getLeader().getDatanodeUuid(); - ContainerCommandRequestProto.Builder request = - ContainerCommandRequestProto.newBuilder(); - request.setCmdType(ContainerProtos.Type.CreateContainer); - request.setCreateContainer(createRequest); - request.setDatanodeID(id); - request.setTraceID(traceID); - ContainerCommandResponseProto response = client.sendCommand( - request.build()); - validateContainerResponse(response); - } - - /** - * Deletes a container from a pipeline. - * - * @param client - * @param force whether or not to forcibly delete the container. - * @param traceID - * @throws IOException - */ - public static void deleteContainer(XceiverClientSpi client, - boolean force, String traceID) throws IOException { - ContainerProtos.DeleteContainerRequestProto.Builder deleteRequest = - ContainerProtos.DeleteContainerRequestProto.newBuilder(); - deleteRequest.setName(client.getPipeline().getContainerName()); - deleteRequest.setPipeline(client.getPipeline().getProtobufMessage()); - deleteRequest.setForceDelete(force); - String id = client.getPipeline().getLeader().getDatanodeUuid(); - ContainerCommandRequestProto.Builder request = - ContainerCommandRequestProto.newBuilder(); - request.setCmdType(ContainerProtos.Type.DeleteContainer); - request.setDeleteContainer(deleteRequest); - request.setTraceID(traceID); - request.setDatanodeID(id); - ContainerCommandResponseProto response = - client.sendCommand(request.build()); - validateContainerResponse(response); - } - - /** - * Close a container. - * - * @param client - * @param traceID - * @throws IOException - */ - public static void closeContainer(XceiverClientSpi client, String traceID) - throws IOException { - ContainerProtos.CloseContainerRequestProto.Builder closeRequest = - ContainerProtos.CloseContainerRequestProto.newBuilder(); - closeRequest.setPipeline(client.getPipeline().getProtobufMessage()); - - String id = client.getPipeline().getLeader().getDatanodeUuid(); - ContainerCommandRequestProto.Builder request = - ContainerCommandRequestProto.newBuilder(); - request.setCmdType(Type.CloseContainer); - request.setCloseContainer(closeRequest); - request.setTraceID(traceID); - request.setDatanodeID(id); - ContainerCommandResponseProto response = - client.sendCommand(request.build()); - validateContainerResponse(response); - } - - /** - * readContainer call that gets meta data from an existing container. - * - * @param client - client - * @param traceID - trace ID - * @throws IOException - */ - public static ReadContainerResponseProto readContainer( - XceiverClientSpi client, String containerName, - String traceID) throws IOException { - ReadContainerRequestProto.Builder readRequest = - ReadContainerRequestProto.newBuilder(); - readRequest.setName(containerName); - readRequest.setPipeline(client.getPipeline().getProtobufMessage()); - String id = client.getPipeline().getLeader().getDatanodeUuid(); - ContainerCommandRequestProto.Builder request = - ContainerCommandRequestProto.newBuilder(); - request.setCmdType(Type.ReadContainer); - request.setReadContainer(readRequest); - request.setDatanodeID(id); - request.setTraceID(traceID); - ContainerCommandResponseProto response = - client.sendCommand(request.build()); - validateContainerResponse(response); - return response.getReadContainer(); - } - - /** - * Reads the data given the container name and key. - * - * @param client - * @param containerName - name of the container - * @param key - key - * @param traceID - trace ID - * @return GetSmallFileResponseProto - * @throws IOException - */ - public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client, - String containerName, String key, String traceID) throws IOException { - KeyData containerKeyData = KeyData - .newBuilder() - .setContainerName(containerName) - .setName(key).build(); - - GetKeyRequestProto.Builder getKey = GetKeyRequestProto - .newBuilder() - .setPipeline(client.getPipeline().getProtobufMessage()) - .setKeyData(containerKeyData); - ContainerProtos.GetSmallFileRequestProto getSmallFileRequest = - GetSmallFileRequestProto - .newBuilder().setKey(getKey) - .build(); - String id = client.getPipeline().getLeader().getDatanodeUuid(); - ContainerCommandRequestProto request = ContainerCommandRequestProto - .newBuilder() - .setCmdType(Type.GetSmallFile) - .setTraceID(traceID) - .setDatanodeID(id) - .setGetSmallFile(getSmallFileRequest) - .build(); - ContainerCommandResponseProto response = client.sendCommand(request); - validateContainerResponse(response); - return response.getGetSmallFile(); - } - - /** - * Validates a response from a container protocol call. Any non-successful - * return code is mapped to a corresponding exception and thrown. - * - * @param response container protocol call response - * @throws IOException if the container protocol call failed - */ - private static void validateContainerResponse( - ContainerCommandResponseProto response - ) throws StorageContainerException { - if (response.getResult() == ContainerProtos.Result.SUCCESS) { - return; - } - throw new StorageContainerException( - response.getMessage(), response.getResult()); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/package-info.java deleted file mode 100644 index aa89af0..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.scm.storage; - -/** - * This package contains StorageContainerManager classes. - */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java deleted file mode 100644 index de159ab..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ratis; - -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.apache.ratis.client.RaftClient; -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.grpc.GrpcConfigKeys; -import org.apache.ratis.protocol.RaftGroup; -import org.apache.ratis.protocol.RaftGroupId; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.rpc.RpcType; -import org.apache.ratis.shaded.com.google.protobuf.ByteString; -import org.apache.ratis.util.SizeInBytes; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Collections; -import java.util.Collection; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.stream.Collectors; - -/** - * Ratis helper methods. - */ -public interface RatisHelper { - Logger LOG = LoggerFactory.getLogger(RatisHelper.class); - - static String toRaftPeerIdString(DatanodeID id) { - return id.getIpAddr() + "_" + id.getRatisPort(); - } - - static String toRaftPeerAddressString(DatanodeID id) { - return id.getIpAddr() + ":" + id.getRatisPort(); - } - - static RaftPeerId toRaftPeerId(DatanodeID id) { - return RaftPeerId.valueOf(toRaftPeerIdString(id)); - } - - static RaftPeer toRaftPeer(DatanodeID id) { - return new RaftPeer(toRaftPeerId(id), toRaftPeerAddressString(id)); - } - - static List<RaftPeer> toRaftPeers(Pipeline pipeline) { - return toRaftPeers(pipeline.getMachines()); - } - - static <E extends DatanodeID> List<RaftPeer> toRaftPeers(List<E> datanodes) { - return datanodes.stream().map(RatisHelper::toRaftPeer) - .collect(Collectors.toList()); - } - - /* TODO: use a dummy id for all groups for the moment. - * It should be changed to a unique id for each group. - */ - RaftGroupId DUMMY_GROUP_ID = - RaftGroupId.valueOf(ByteString.copyFromUtf8("AOzoneRatisGroup")); - - RaftGroup EMPTY_GROUP = new RaftGroup(DUMMY_GROUP_ID, - Collections.emptyList()); - - static RaftGroup emptyRaftGroup() { - return EMPTY_GROUP; - } - - static RaftGroup newRaftGroup(List<DatanodeID> datanodes) { - final List<RaftPeer> newPeers = datanodes.stream() - .map(RatisHelper::toRaftPeer) - .collect(Collectors.toList()); - return RatisHelper.newRaftGroup(newPeers); - } - - static RaftGroup newRaftGroup(Collection<RaftPeer> peers) { - return peers.isEmpty()? emptyRaftGroup() - : new RaftGroup(DUMMY_GROUP_ID, peers); - } - - static RaftGroup newRaftGroup(Pipeline pipeline) { - return newRaftGroup(toRaftPeers(pipeline)); - } - - static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline) { - return newRaftClient(rpcType, toRaftPeerId(pipeline.getLeader()), - newRaftGroup(pipeline)); - } - - static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader) { - return newRaftClient(rpcType, leader.getId(), - newRaftGroup(new ArrayList<>(Arrays.asList(leader)))); - } - - static RaftClient newRaftClient( - RpcType rpcType, RaftPeerId leader, RaftGroup group) { - LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group); - final RaftProperties properties = new RaftProperties(); - RaftConfigKeys.Rpc.setType(properties, rpcType); - GrpcConfigKeys.setMessageSizeMax(properties, - SizeInBytes.valueOf(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)); - - return RaftClient.newBuilder() - .setRaftGroup(group) - .setLeaderId(leader) - .setProperties(properties) - .build(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/package-info.java deleted file mode 100644 index c13c20c..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis; - -/** - * This package contains classes related to Apache Ratis. - */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/shaded/com/google/protobuf/ShadedProtoUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/shaded/com/google/protobuf/ShadedProtoUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/shaded/com/google/protobuf/ShadedProtoUtil.java deleted file mode 100644 index 29242ad..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/shaded/com/google/protobuf/ShadedProtoUtil.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.shaded.com.google.protobuf; - -/** Utilities for the shaded protobuf in Ratis. */ -public interface ShadedProtoUtil { - /** - * @param bytes - * @return the wrapped shaded {@link ByteString} (no coping). - */ - static ByteString asShadedByteString(byte[] bytes) { - return ByteString.wrap(bytes); - } - - /** - * @param shaded - * @return a {@link com.google.protobuf.ByteString} (require coping). - */ - static com.google.protobuf.ByteString asByteString(ByteString shaded) { - return com.google.protobuf.ByteString.copyFrom( - shaded.asReadOnlyByteBuffer()); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/shaded/com/google/protobuf/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/shaded/com/google/protobuf/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/shaded/com/google/protobuf/package-info.java deleted file mode 100644 index 032dd96..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/shaded/com/google/protobuf/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.shaded.com.google.protobuf; - -/** - * This package contains classes related to the shaded protobuf in Apache Ratis. - */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/DatanodeContainerProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/DatanodeContainerProtocol.proto deleted file mode 100644 index 4e79827..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/DatanodeContainerProtocol.proto +++ /dev/null @@ -1,415 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * These .proto interfaces are private and Unstable. - * Please see http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/InterfaceClassification.html - * for what changes are allowed for a *Unstable* .proto interface. - */ - -// This file contains protocol buffers that are used to transfer data -// to and from the datanode. -option java_package = "org.apache.hadoop.hdfs.ozone.protocol.proto"; -option java_outer_classname = "ContainerProtos"; -option java_generate_equals_and_hash = true; -package hadoop.hdfs.ozone; -import "hdfs.proto"; -import "Ozone.proto"; - -/** - * Commands that are used to manipulate the state of containers on a datanode. - * - * These commands allow us to work against the datanode - from - * StorageContainer Manager as well as clients. - * - * 1. CreateContainer - This call is usually made by Storage Container - * manager, when we need to create a new container on a given datanode. - * - * 2. ReadContainer - Allows end user to stat a container. For example - * this allows us to return the metadata of a container. - * - * 3. UpdateContainer - Updates a container metadata. - - * 4. DeleteContainer - This call is made to delete a container. - * - * 5. ListContainer - Returns the list of containers on this - * datanode. This will be used by tests and tools. - * - * 6. PutKey - Given a valid container, creates a key. - * - * 7. GetKey - Allows user to read the metadata of a Key. - * - * 8. DeleteKey - Deletes a given key. - * - * 9. ListKey - Returns a list of keys that are present inside - * a given container. - * - * 10. ReadChunk - Allows us to read a chunk. - * - * 11. DeleteChunk - Delete an unused chunk. - * - * 12. WriteChunk - Allows us to write a chunk - * - * 13. ListChunk - Given a Container/Key returns the list of Chunks. - * - * 14. CompactChunk - Re-writes a chunk based on Offsets. - * - * 15. PutSmallFile - A single RPC that combines both putKey and WriteChunk. - * - * 16. GetSmallFile - A single RPC that combines both getKey and ReadChunk. - * - * 17. CloseContainer - Closes an open container and makes it immutable. - * - * 18. CopyContainer - Copies a container from a remote machine. - */ - -enum Type { - CreateContainer = 1; - ReadContainer = 2; - UpdateContainer = 3; - DeleteContainer = 4; - ListContainer = 5; - - PutKey = 6; - GetKey = 7; - DeleteKey = 8; - ListKey = 9; - - ReadChunk = 10; - DeleteChunk = 11; - WriteChunk = 12; - ListChunk = 13; - CompactChunk = 14; - - /** Combines Key and Chunk Operation into Single RPC. */ - PutSmallFile = 15; - GetSmallFile = 16; - CloseContainer = 17; - -} - - -enum Result { - SUCCESS = 1; - UNSUPPORTED_REQUEST = 2; - MALFORMED_REQUEST = 3; - CONTAINER_INTERNAL_ERROR = 4; - INVALID_CONFIG = 5; - INVALID_FILE_HASH_FOUND = 6; - CONTAINER_EXISTS = 7; - NO_SUCH_ALGORITHM = 8; - CONTAINER_NOT_FOUND = 9; - IO_EXCEPTION = 10; - UNABLE_TO_READ_METADATA_DB = 11; - NO_SUCH_KEY = 12; - OVERWRITE_FLAG_REQUIRED = 13; - UNABLE_TO_FIND_DATA_DIR = 14; - INVALID_WRITE_SIZE = 15; - CHECKSUM_MISMATCH = 16; - UNABLE_TO_FIND_CHUNK = 17; - PROTOC_DECODING_ERROR = 18; - INVALID_ARGUMENT = 19; - PUT_SMALL_FILE_ERROR = 20; - GET_SMALL_FILE_ERROR = 21; - CLOSED_CONTAINER_IO = 22; - ERROR_CONTAINER_NOT_EMPTY = 23; - ERROR_IN_COMPACT_DB = 24; - UNCLOSED_CONTAINER_IO = 25; - DELETE_ON_OPEN_CONTAINER = 26; - CLOSED_CONTAINER_RETRY = 27; -} - -message ContainerCommandRequestProto { - required Type cmdType = 1; // Type of the command - - // A string that identifies this command, we generate Trace ID in Ozone - // frontend and this allows us to trace that command all over ozone. - optional string traceID = 2; - - // One of the following command is available when the corresponding - // cmdType is set. At the protocol level we allow only - // one command in each packet. - // TODO : Upgrade to Protobuf 2.6 or later. - optional CreateContainerRequestProto createContainer = 3; - optional ReadContainerRequestProto readContainer = 4; - optional UpdateContainerRequestProto updateContainer = 5; - optional DeleteContainerRequestProto deleteContainer = 6; - optional ListContainerRequestProto listContainer = 7; - - optional PutKeyRequestProto putKey = 8; - optional GetKeyRequestProto getKey = 9; - optional DeleteKeyRequestProto deleteKey = 10; - optional ListKeyRequestProto listKey = 11; - - optional ReadChunkRequestProto readChunk = 12; - optional WriteChunkRequestProto writeChunk = 13; - optional DeleteChunkRequestProto deleteChunk = 14; - optional ListChunkRequestProto listChunk = 15; - - optional PutSmallFileRequestProto putSmallFile = 16; - optional GetSmallFileRequestProto getSmallFile = 17; - optional CloseContainerRequestProto closeContainer = 18; - required string datanodeID = 19; -} - -message ContainerCommandResponseProto { - required Type cmdType = 1; - optional string traceID = 2; - - optional CreateContainerResponseProto createContainer = 3; - optional ReadContainerResponseProto readContainer = 4; - optional UpdateContainerResponseProto updateContainer = 5; - optional DeleteContainerResponseProto deleteContainer = 6; - optional ListContainerResponseProto listContainer = 7; - - optional PutKeyResponseProto putKey = 8; - optional GetKeyResponseProto getKey = 9; - optional DeleteKeyResponseProto deleteKey = 10; - optional ListKeyResponseProto listKey = 11; - - optional WriteChunkResponseProto writeChunk = 12; - optional ReadChunkResponseProto readChunk = 13; - optional DeleteChunkResponseProto deleteChunk = 14; - optional ListChunkResponseProto listChunk = 15; - - required Result result = 17; - optional string message = 18; - - optional PutSmallFileResponseProto putSmallFile = 19; - optional GetSmallFileResponseProto getSmallFile = 20; - optional CloseContainerResponseProto closeContainer = 21; - -} - -message ContainerData { - required string name = 1; - repeated KeyValue metadata = 2; - optional string dbPath = 3; - optional string containerPath = 4; - optional string hash = 6; - optional int64 bytesUsed = 7; - optional int64 size = 8; - optional int64 keyCount = 9; - //TODO: change required after we switch container ID from string to long - optional int64 containerID = 10; - optional LifeCycleState state = 11 [default = OPEN]; -} - -message ContainerMeta { - required string fileName = 1; - required string hash = 2; -} - -// Container Messages. -message CreateContainerRequestProto { - required Pipeline pipeline = 1; - required ContainerData containerData = 2; -} - -message CreateContainerResponseProto { -} - -message ReadContainerRequestProto { - required Pipeline pipeline = 1; - required string name = 2; -} - -message ReadContainerResponseProto { - optional ContainerData containerData = 2; -} - -message UpdateContainerRequestProto { - required Pipeline pipeline = 1; - required ContainerData containerData = 2; - optional bool forceUpdate = 3 [default = false]; -} - -message UpdateContainerResponseProto { -} - -message DeleteContainerRequestProto { - required Pipeline pipeline = 1; - required string name = 2; - optional bool forceDelete = 3 [default = false]; -} - -message DeleteContainerResponseProto { -} - -message ListContainerRequestProto { - required Pipeline pipeline = 1; - optional string prefix = 2; - required uint32 count = 3; // Max Results to return - optional string prevKey = 4; // if this is not set query from start. -} - -message ListContainerResponseProto { - repeated ContainerData containerData = 1; -} - -message CloseContainerRequestProto { - required Pipeline pipeline = 1; -} - -message CloseContainerResponseProto { - optional Pipeline pipeline = 1; - optional string hash = 2; -} - -message KeyData { - required string containerName = 1; - required string name = 2; - optional int64 flags = 3; // for future use. - repeated KeyValue metadata = 4; - repeated ChunkInfo chunks = 5; -} - -// Key Messages. -message PutKeyRequestProto { - required Pipeline pipeline = 1; - required KeyData keyData = 2; -} - -message PutKeyResponseProto { -} - -message GetKeyRequestProto { - required Pipeline pipeline = 1; - required KeyData keyData = 2; -} - -message GetKeyResponseProto { - required KeyData keyData = 1; -} - - -message DeleteKeyRequestProto { - required Pipeline pipeline = 1; - required string name = 2; -} - -message DeleteKeyResponseProto { -} - -message ListKeyRequestProto { - required Pipeline pipeline = 1; - optional string prefix = 2; // if specified returns keys that match prefix. - required string prevKey = 3; - required uint32 count = 4; - -} - -message ListKeyResponseProto { - repeated KeyData keyData = 1; -} - -// Chunk Operations - -message ChunkInfo { - required string chunkName = 1; - required uint64 offset = 2; - required uint64 len = 3; - optional string checksum = 4; - repeated KeyValue metadata = 5; -} - -enum Stage { - WRITE_DATA = 1; - COMMIT_DATA = 2; - COMBINED = 3; -} - -message WriteChunkRequestProto { - required Pipeline pipeline = 1; - required string keyName = 2; - required ChunkInfo chunkData = 3; - optional bytes data = 4; - optional Stage stage = 5 [default = COMBINED]; -} - -message WriteChunkResponseProto { -} - -message ReadChunkRequestProto { - required Pipeline pipeline = 1; - required string keyName = 2; - required ChunkInfo chunkData = 3; -} - -message ReadChunkResponseProto { - required Pipeline pipeline = 1; - required ChunkInfo chunkData = 2; - required bytes data = 3; -} - -message DeleteChunkRequestProto { - required Pipeline pipeline = 1; - required string keyName = 2; - required ChunkInfo chunkData = 3; -} - -message DeleteChunkResponseProto { -} - -message ListChunkRequestProto { - required Pipeline pipeline = 1; - required string keyName = 2; - required string prevChunkName = 3; - required uint32 count = 4; -} - -message ListChunkResponseProto { - repeated ChunkInfo chunkData = 1; -} - -/** For small file access combines write chunk and putKey into a single -RPC */ - -message PutSmallFileRequestProto { - required PutKeyRequestProto key = 1; - required ChunkInfo chunkInfo = 2; - required bytes data = 3; -} - - -message PutSmallFileResponseProto { - -} - -message GetSmallFileRequestProto { - required GetKeyRequestProto key = 1; -} - -message GetSmallFileResponseProto { - required ReadChunkResponseProto data = 1; -} - -message CopyContainerRequestProto { - required string containerName = 1; - required uint64 readOffset = 2; - optional uint64 len = 3; -} - -message CopyContainerResponseProto { - required string archiveName = 1; - required uint64 readOffset = 2; - required uint64 len = 3; - required bool eof = 4; - repeated bytes data = 5; - optional int64 checksum = 6; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto deleted file mode 100644 index cdc1df5..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto +++ /dev/null @@ -1,457 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * These .proto interfaces are private and unstable. - * Please see http://wiki.apache.org/hadoop/Compatibility - * for what changes are allowed for a *unstable* .proto interface. - */ - -option java_package = "org.apache.hadoop.ozone.protocol.proto"; -option java_outer_classname = "KeySpaceManagerProtocolProtos"; -option java_generic_services = true; -option java_generate_equals_and_hash = true; -package hadoop.hdfs.ozone; - -/** -This is file contains the protocol to communicate with -Ozone key space manager. Ozone KSM manages the namespace for ozone. -This is similar to Namenode for Ozone. -*/ - -import "hdfs.proto"; -import "Ozone.proto"; - -enum Status { - OK = 1; - VOLUME_NOT_UNIQUE = 2; - VOLUME_NOT_FOUND = 3; - VOLUME_NOT_EMPTY = 4; - VOLUME_ALREADY_EXISTS = 5; - USER_NOT_FOUND = 6; - USER_TOO_MANY_VOLUMES = 7; - BUCKET_NOT_FOUND = 8; - BUCKET_NOT_EMPTY = 9; - BUCKET_ALREADY_EXISTS = 10; - KEY_ALREADY_EXISTS = 11; - KEY_NOT_FOUND = 12; - ACCESS_DENIED = 13; - INTERNAL_ERROR = 14; -} - - -message VolumeInfo { - required string adminName = 1; - required string ownerName = 2; - required string volume = 3; - optional uint64 quotaInBytes = 4; - repeated KeyValue metadata = 5; - repeated OzoneAclInfo volumeAcls = 6; - required uint64 creationTime = 7; -} - -/** - Creates a volume -*/ -message CreateVolumeRequest { - required VolumeInfo volumeInfo = 1; -} - -message CreateVolumeResponse { - - required Status status = 1; -} - -message VolumeList { - repeated string volumeNames = 1; -} - -/** - Changes the Volume Properties -- like ownership and quota for a volume. -*/ -message SetVolumePropertyRequest { - required string volumeName = 1; - optional string ownerName = 2; - optional uint64 quotaInBytes = 3; -} - -message SetVolumePropertyResponse { - required Status status = 1; -} - -/** - * Checks if the user has specified permissions for the volume - */ -message CheckVolumeAccessRequest { - required string volumeName = 1; - required OzoneAclInfo userAcl = 2; -} - -message CheckVolumeAccessResponse { - - required Status status = 1; -} - - -/** - Returns information about a volume. -*/ - -message InfoVolumeRequest { - required string volumeName = 1; -} - -message InfoVolumeResponse { - required Status status = 1; - optional VolumeInfo volumeInfo = 2; - -} - -/** - Deletes an existing volume. -*/ -message DeleteVolumeRequest { - required string volumeName = 1; -} - -message DeleteVolumeResponse { - required Status status = 1; -} - - -/** - List Volumes -- List all volumes in the cluster or by user. -*/ - -message ListVolumeRequest { - enum Scope { - USER_VOLUMES = 1; // User volumes -- called by user - VOLUMES_BY_USER = 2; // User volumes - called by Admin - VOLUMES_BY_CLUSTER = 3; // All volumes in the cluster - } - required Scope scope = 1; - optional string userName = 2; - optional string prefix = 3; - optional string prevKey = 4; - optional uint32 maxKeys = 5; -} - -message ListVolumeResponse { - required Status status = 1; - repeated VolumeInfo volumeInfo = 2; -} - -message BucketInfo { - required string volumeName = 1; - required string bucketName = 2; - repeated OzoneAclInfo acls = 3; - required bool isVersionEnabled = 4 [default = false]; - required StorageTypeProto storageType = 5 [default = DISK]; - required uint64 creationTime = 6; -} - -message BucketArgs { - required string volumeName = 1; - required string bucketName = 2; - repeated OzoneAclInfo addAcls = 3; - repeated OzoneAclInfo removeAcls = 4; - optional bool isVersionEnabled = 5; - optional StorageTypeProto storageType = 6; -} - -message OzoneAclInfo { - enum OzoneAclType { - USER = 1; - GROUP = 2; - WORLD = 3; - } - enum OzoneAclRights { - READ = 1; - WRITE = 2; - READ_WRITE = 3; - } - required OzoneAclType type = 1; - required string name = 2; - required OzoneAclRights rights = 3; -} - -message CreateBucketRequest { - required BucketInfo bucketInfo = 1; -} - -message CreateBucketResponse { - required Status status = 1; -} - -message InfoBucketRequest { - required string volumeName = 1; - required string bucketName = 2; -} - -message InfoBucketResponse { - required Status status = 1; - optional BucketInfo bucketInfo = 2; -} - -message ListBucketsRequest { - required string volumeName = 1; - optional string startKey = 2; - optional string prefix = 3; - optional int32 count = 4; -} - -message ListBucketsResponse { - required Status status = 1; - repeated BucketInfo bucketInfo = 2; -} - -message KeyArgs { - required string volumeName = 1; - required string bucketName = 2; - required string keyName = 3; - optional uint64 dataSize = 4; - optional hadoop.hdfs.ozone.ReplicationType type = 5; - optional hadoop.hdfs.ozone.ReplicationFactor factor = 6; -} - -message KeyLocation { - required string blockID = 1; - required string containerName = 2; - required bool shouldCreateContainer = 3; - required uint64 offset = 4; - required uint64 length = 5; - // indicated at which version this block gets created. - optional uint64 createVersion = 6; -} - -message KeyLocationList { - optional uint64 version = 1; - repeated KeyLocation keyLocations = 2; -} - -message KeyInfo { - required string volumeName = 1; - required string bucketName = 2; - required string keyName = 3; - required uint64 dataSize = 4; - repeated KeyLocationList keyLocationList = 5; - required uint64 creationTime = 6; - required uint64 modificationTime = 7; - optional uint64 latestVersion = 8; -} - -message LocateKeyRequest { - required KeyArgs keyArgs = 1; -} - -message LocateKeyResponse { - required Status status = 1; - optional KeyInfo keyInfo = 2; - // clients' followup request may carry this ID for stateful operations (similar - // to a cookie). - optional uint32 ID = 3; - // TODO : allow specifiying a particular version to read. - optional uint64 openVersion = 4; -} - -message SetBucketPropertyRequest { - required BucketArgs bucketArgs = 1; -} - -message SetBucketPropertyResponse { - required Status status = 1; -} - -message DeleteBucketRequest { - required string volumeName = 1; - required string bucketName = 2; -} - -message DeleteBucketResponse { - required Status status = 1; -} - -message ListKeysRequest { - required string volumeName = 1; - required string bucketName = 2; - optional string startKey = 3; - optional string prefix = 4; - optional int32 count = 5; -} - -message ListKeysResponse { - required Status status = 1; - repeated KeyInfo keyInfo = 2; -} - -message AllocateBlockRequest { - required KeyArgs keyArgs = 1; - required uint32 clientID = 2; -} - -message AllocateBlockResponse { - required Status status = 1; - required KeyLocation keyLocation = 2; -} - -message CommitKeyRequest { - required KeyArgs keyArgs = 1; - required uint32 clientID = 2; -} - -message CommitKeyResponse { - required Status status = 1; -} - -message ServiceListRequest { -} - -message ServiceListResponse { - required Status status = 1; - repeated ServiceInfo serviceInfo = 2; -} - -message ServicePort { - enum Type { - RPC = 1; - HTTP = 2; - HTTPS = 3; - RATIS = 4; - }; - required Type type = 1; - required uint32 value = 2; -} - -message ServiceInfo { - required NodeType nodeType = 1; - required string hostname = 2; - repeated ServicePort servicePorts = 3; -} - -/** - The KSM service that takes care of Ozone namespace. -*/ -service KeySpaceManagerService { - - /** - Creates a Volume. - */ - rpc createVolume(CreateVolumeRequest) - returns(CreateVolumeResponse); - - /** - Allows modificiation of volume properties. - */ - rpc setVolumeProperty(SetVolumePropertyRequest) - returns (SetVolumePropertyResponse); - - /** - Checks if the specified volume is accesible by the specified user. - */ - rpc checkVolumeAccess(CheckVolumeAccessRequest) - returns (CheckVolumeAccessResponse); - - /** - Gets Volume information. - */ - rpc infoVolume(InfoVolumeRequest) - returns(InfoVolumeResponse); - /** - Deletes a volume if it is empty. - */ - rpc deleteVolume(DeleteVolumeRequest) - returns (DeleteVolumeResponse); - - /** - Lists Volumes - */ - rpc listVolumes(ListVolumeRequest) - returns (ListVolumeResponse); - - /** - Creates a Bucket. - */ - rpc createBucket(CreateBucketRequest) - returns(CreateBucketResponse); - - /** - Get Bucket information. - */ - rpc infoBucket(InfoBucketRequest) - returns(InfoBucketResponse); - - /** - Sets bucket properties. - */ - rpc setBucketProperty(SetBucketPropertyRequest) - returns(SetBucketPropertyResponse); - - /** - Get key. - */ - rpc createKey(LocateKeyRequest) - returns(LocateKeyResponse); - - /** - Look up for an existing key. - */ - rpc lookupKey(LocateKeyRequest) - returns(LocateKeyResponse); - - /** - Delete an existing key. - */ - rpc deleteKey(LocateKeyRequest) - returns(LocateKeyResponse); - - /** - Deletes a bucket from volume if it is empty. - */ - rpc deleteBucket(DeleteBucketRequest) - returns (DeleteBucketResponse); - - /** - List Buckets. - */ - rpc listBuckets(ListBucketsRequest) - returns(ListBucketsResponse); - - /** - List Keys. - */ - rpc listKeys(ListKeysRequest) - returns(ListKeysResponse); - - /** - Commit a key. - */ - rpc commitKey(CommitKeyRequest) - returns(CommitKeyResponse); - - /** - Allocate a new block for a key. - */ - rpc allocateBlock(AllocateBlockRequest) - returns(AllocateBlockResponse); - - /** - Returns list of Ozone services with its configuration details. - */ - rpc getServiceList(ServiceListRequest) - returns(ServiceListResponse); -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org