http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java new file mode 100644 index 0000000..1c6e39c --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java @@ -0,0 +1,713 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.impl; + +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers + .StorageContainerException; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos + .ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos + .ContainerCommandResponseProto; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Type; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils; +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; +import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; +import org.apache.hadoop.ozone.container.common.helpers.FileUtils; +import org.apache.hadoop.ozone.container.common.helpers.KeyData; +import org.apache.hadoop.ozone.container.common.helpers.KeyUtils; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.security.NoSuchAlgorithmException; +import java.util.LinkedList; +import java.util.List; + +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .CLOSED_CONTAINER_IO; +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .GET_SMALL_FILE_ERROR; +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .NO_SUCH_ALGORITHM; +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .PUT_SMALL_FILE_ERROR; + +/** + * Ozone Container dispatcher takes a call from the netty server and routes it + * to the right handler function. + */ +public class Dispatcher implements ContainerDispatcher { + static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class); + + private final ContainerManager containerManager; + private ContainerMetrics metrics; + private Configuration conf; + + /** + * Constructs an OzoneContainer that receives calls from + * XceiverServerHandler. + * + * @param containerManager - A class that manages containers. + */ + public Dispatcher(ContainerManager containerManager, Configuration config) { + Preconditions.checkNotNull(containerManager); + this.containerManager = containerManager; + this.metrics = null; + this.conf = config; + } + + @Override + public void init() { + this.metrics = ContainerMetrics.create(conf); + } + + @Override + public void shutdown() { + } + + @Override + public ContainerCommandResponseProto dispatch( + ContainerCommandRequestProto msg) { + LOG.trace("Command {}, trace ID: {} ", msg.getCmdType().toString(), + msg.getTraceID()); + long startNanos = System.nanoTime(); + ContainerCommandResponseProto resp = null; + try { + Preconditions.checkNotNull(msg); + Type cmdType = msg.getCmdType(); + metrics.incContainerOpcMetrics(cmdType); + if ((cmdType == Type.CreateContainer) || + (cmdType == Type.DeleteContainer) || + (cmdType == Type.ReadContainer) || + (cmdType == Type.ListContainer) || + (cmdType == Type.UpdateContainer) || + (cmdType == Type.CloseContainer)) { + resp = containerProcessHandler(msg); + } + + if ((cmdType == Type.PutKey) || + (cmdType == Type.GetKey) || + (cmdType == Type.DeleteKey) || + (cmdType == Type.ListKey)) { + resp = keyProcessHandler(msg); + } + + if ((cmdType == Type.WriteChunk) || + (cmdType == Type.ReadChunk) || + (cmdType == Type.DeleteChunk)) { + resp = chunkProcessHandler(msg); + } + + if ((cmdType == Type.PutSmallFile) || + (cmdType == Type.GetSmallFile)) { + resp = smallFileHandler(msg); + } + + if (resp != null) { + metrics.incContainerOpsLatencies(cmdType, + System.nanoTime() - startNanos); + return resp; + } + + return ContainerUtils.unsupportedRequest(msg); + } catch (StorageContainerException e) { + // This useful since the trace ID will allow us to correlate failures. + return ContainerUtils.logAndReturnError(LOG, e, msg); + } catch (IllegalStateException | NullPointerException e) { + return ContainerUtils.logAndReturnError(LOG, e, msg); + } + } + + public ContainerMetrics getContainerMetrics() { + return metrics; + } + + /** + * Handles the all Container related functionality. + * + * @param msg - command + * @return - response + * @throws StorageContainerException + */ + private ContainerCommandResponseProto containerProcessHandler( + ContainerCommandRequestProto msg) throws StorageContainerException { + try { + + switch (msg.getCmdType()) { + case CreateContainer: + return handleCreateContainer(msg); + + case DeleteContainer: + return handleDeleteContainer(msg); + + case ListContainer: + // TODO : Support List Container. + return ContainerUtils.unsupportedRequest(msg); + + case UpdateContainer: + return handleUpdateContainer(msg); + + case ReadContainer: + return handleReadContainer(msg); + + case CloseContainer: + return handleCloseContainer(msg); + + default: + return ContainerUtils.unsupportedRequest(msg); + } + } catch (StorageContainerException e) { + return ContainerUtils.logAndReturnError(LOG, e, msg); + } catch (IOException ex) { + LOG.warn("Container operation failed. " + + "Container: {} Operation: {} trace ID: {} Error: {}", + msg.getCreateContainer().getContainerData().getName(), + msg.getCmdType().name(), + msg.getTraceID(), + ex.toString(), ex); + + // TODO : Replace with finer error codes. + return ContainerUtils.getContainerResponse(msg, + ContainerProtos.Result.CONTAINER_INTERNAL_ERROR, + ex.toString()).build(); + } + } + + /** + * Handles the all key related functionality. + * + * @param msg - command + * @return - response + * @throws StorageContainerException + */ + private ContainerCommandResponseProto keyProcessHandler( + ContainerCommandRequestProto msg) throws StorageContainerException { + try { + switch (msg.getCmdType()) { + case PutKey: + return handlePutKey(msg); + + case GetKey: + return handleGetKey(msg); + + case DeleteKey: + return handleDeleteKey(msg); + + case ListKey: + return ContainerUtils.unsupportedRequest(msg); + + default: + return ContainerUtils.unsupportedRequest(msg); + + } + } catch (StorageContainerException e) { + return ContainerUtils.logAndReturnError(LOG, e, msg); + } catch (IOException ex) { + LOG.warn("Container operation failed. " + + "Container: {} Operation: {} trace ID: {} Error: {}", + msg.getCreateContainer().getContainerData().getName(), + msg.getCmdType().name(), + msg.getTraceID(), + ex.toString(), ex); + + // TODO : Replace with finer error codes. + return ContainerUtils.getContainerResponse(msg, + ContainerProtos.Result.CONTAINER_INTERNAL_ERROR, + ex.toString()).build(); + } + } + + /** + * Handles the all chunk related functionality. + * + * @param msg - command + * @return - response + * @throws StorageContainerException + */ + private ContainerCommandResponseProto chunkProcessHandler( + ContainerCommandRequestProto msg) throws StorageContainerException { + try { + switch (msg.getCmdType()) { + case WriteChunk: + return handleWriteChunk(msg); + + case ReadChunk: + return handleReadChunk(msg); + + case DeleteChunk: + return handleDeleteChunk(msg); + + case ListChunk: + return ContainerUtils.unsupportedRequest(msg); + + default: + return ContainerUtils.unsupportedRequest(msg); + } + } catch (StorageContainerException e) { + return ContainerUtils.logAndReturnError(LOG, e, msg); + } catch (IOException ex) { + LOG.warn("Container operation failed. " + + "Container: {} Operation: {} trace ID: {} Error: {}", + msg.getCreateContainer().getContainerData().getName(), + msg.getCmdType().name(), + msg.getTraceID(), + ex.toString(), ex); + + // TODO : Replace with finer error codes. + return ContainerUtils.getContainerResponse(msg, + ContainerProtos.Result.CONTAINER_INTERNAL_ERROR, + ex.toString()).build(); + } + } + + /** + * Dispatch calls to small file hanlder. + * @param msg - request + * @return response + * @throws StorageContainerException + */ + private ContainerCommandResponseProto smallFileHandler( + ContainerCommandRequestProto msg) throws StorageContainerException { + switch (msg.getCmdType()) { + case PutSmallFile: + return handlePutSmallFile(msg); + case GetSmallFile: + return handleGetSmallFile(msg); + default: + return ContainerUtils.unsupportedRequest(msg); + } + } + + /** + * Update an existing container with the new container data. + * + * @param msg Request + * @return ContainerCommandResponseProto + * @throws IOException + */ + private ContainerCommandResponseProto handleUpdateContainer( + ContainerCommandRequestProto msg) + throws IOException { + if (!msg.hasUpdateContainer()) { + LOG.debug("Malformed read container request. trace ID: {}", + msg.getTraceID()); + return ContainerUtils.malformedRequest(msg); + } + + Pipeline pipeline = Pipeline.getFromProtoBuf( + msg.getUpdateContainer().getPipeline()); + String containerName = msg.getUpdateContainer() + .getContainerData().getName(); + + ContainerData data = ContainerData.getFromProtBuf( + msg.getUpdateContainer().getContainerData(), conf); + boolean forceUpdate = msg.getUpdateContainer().getForceUpdate(); + this.containerManager.updateContainer( + pipeline, containerName, data, forceUpdate); + return ContainerUtils.getContainerResponse(msg); + } + + /** + * Calls into container logic and returns appropriate response. + * + * @param msg - Request + * @return ContainerCommandResponseProto + * @throws IOException + */ + private ContainerCommandResponseProto handleReadContainer( + ContainerCommandRequestProto msg) + throws IOException { + + if (!msg.hasReadContainer()) { + LOG.debug("Malformed read container request. trace ID: {}", + msg.getTraceID()); + return ContainerUtils.malformedRequest(msg); + } + + String name = msg.getReadContainer().getName(); + ContainerData container = this.containerManager.readContainer(name); + return ContainerUtils.getReadContainerResponse(msg, container); + } + + /** + * Calls into container logic and returns appropriate response. + * + * @param msg - Request + * @return Response. + * @throws IOException + */ + private ContainerCommandResponseProto handleDeleteContainer( + ContainerCommandRequestProto msg) throws IOException { + + if (!msg.hasDeleteContainer()) { + LOG.debug("Malformed delete container request. trace ID: {}", + msg.getTraceID()); + return ContainerUtils.malformedRequest(msg); + } + + Pipeline pipeline = Pipeline.getFromProtoBuf( + msg.getDeleteContainer().getPipeline()); + Preconditions.checkNotNull(pipeline); + String name = msg.getDeleteContainer().getName(); + boolean forceDelete = msg.getDeleteContainer().getForceDelete(); + this.containerManager.deleteContainer(pipeline, name, forceDelete); + return ContainerUtils.getContainerResponse(msg); + } + + /** + * Calls into container logic and returns appropriate response. + * + * @param msg - Request + * @return Response. + * @throws IOException + */ + private ContainerCommandResponseProto handleCreateContainer( + ContainerCommandRequestProto msg) throws IOException { + if (!msg.hasCreateContainer()) { + LOG.debug("Malformed create container request. trace ID: {}", + msg.getTraceID()); + return ContainerUtils.malformedRequest(msg); + } + ContainerData cData = ContainerData.getFromProtBuf( + msg.getCreateContainer().getContainerData(), conf); + Preconditions.checkNotNull(cData, "Container data is null"); + + Pipeline pipeline = Pipeline.getFromProtoBuf( + msg.getCreateContainer().getPipeline()); + Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); + + this.containerManager.createContainer(pipeline, cData); + return ContainerUtils.getContainerResponse(msg); + } + + /** + * closes an open container. + * + * @param msg - + * @return + * @throws IOException + */ + private ContainerCommandResponseProto handleCloseContainer( + ContainerCommandRequestProto msg) throws IOException { + try { + if (!msg.hasCloseContainer()) { + LOG.debug("Malformed close Container request. trace ID: {}", + msg.getTraceID()); + return ContainerUtils.malformedRequest(msg); + } + Pipeline pipeline = Pipeline.getFromProtoBuf(msg.getCloseContainer() + .getPipeline()); + Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); + if (!this.containerManager.isOpen(pipeline.getContainerName())) { + throw new StorageContainerException("Attempting to close a closed " + + "container.", CLOSED_CONTAINER_IO); + } + this.containerManager.closeContainer(pipeline.getContainerName()); + return ContainerUtils.getContainerResponse(msg); + } catch (NoSuchAlgorithmException e) { + throw new StorageContainerException("No such Algorithm", e, + NO_SUCH_ALGORITHM); + } + } + + /** + * Calls into chunk manager to write a chunk. + * + * @param msg - Request. + * @return Response. + * @throws IOException + */ + private ContainerCommandResponseProto handleWriteChunk( + ContainerCommandRequestProto msg) throws IOException { + if (!msg.hasWriteChunk()) { + LOG.debug("Malformed write chunk request. trace ID: {}", + msg.getTraceID()); + return ContainerUtils.malformedRequest(msg); + } + String keyName = msg.getWriteChunk().getKeyName(); + Pipeline pipeline = Pipeline.getFromProtoBuf( + msg.getWriteChunk().getPipeline()); + Preconditions.checkNotNull(pipeline); + if (!this.containerManager.isOpen(pipeline.getContainerName())) { + throw new StorageContainerException("Write to closed container.", + CLOSED_CONTAINER_IO); + } + + ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getWriteChunk() + .getChunkData()); + Preconditions.checkNotNull(chunkInfo); + byte[] data = null; + if (msg.getWriteChunk().getStage() == ContainerProtos.Stage.WRITE_DATA + || msg.getWriteChunk().getStage() == ContainerProtos.Stage.COMBINED) { + data = msg.getWriteChunk().getData().toByteArray(); + metrics.incContainerBytesStats(Type.WriteChunk, data.length); + + } + this.containerManager.getChunkManager() + .writeChunk(pipeline, keyName, chunkInfo, + data, msg.getWriteChunk().getStage()); + + return ChunkUtils.getChunkResponse(msg); + } + + /** + * Calls into chunk manager to read a chunk. + * + * @param msg - Request. + * @return - Response. + * @throws IOException + */ + private ContainerCommandResponseProto handleReadChunk( + ContainerCommandRequestProto msg) throws IOException { + if (!msg.hasReadChunk()) { + LOG.debug("Malformed read chunk request. trace ID: {}", + msg.getTraceID()); + return ContainerUtils.malformedRequest(msg); + } + + String keyName = msg.getReadChunk().getKeyName(); + Pipeline pipeline = Pipeline.getFromProtoBuf( + msg.getReadChunk().getPipeline()); + Preconditions.checkNotNull(pipeline); + + ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getReadChunk() + .getChunkData()); + Preconditions.checkNotNull(chunkInfo); + byte[] data = this.containerManager.getChunkManager().readChunk(pipeline, + keyName, chunkInfo); + metrics.incContainerBytesStats(Type.ReadChunk, data.length); + return ChunkUtils.getReadChunkResponse(msg, data, chunkInfo); + } + + /** + * Calls into chunk manager to write a chunk. + * + * @param msg - Request. + * @return Response. + * @throws IOException + */ + private ContainerCommandResponseProto handleDeleteChunk( + ContainerCommandRequestProto msg) throws IOException { + if (!msg.hasDeleteChunk()) { + LOG.debug("Malformed delete chunk request. trace ID: {}", + msg.getTraceID()); + return ContainerUtils.malformedRequest(msg); + } + + String keyName = msg.getDeleteChunk().getKeyName(); + Pipeline pipeline = Pipeline.getFromProtoBuf( + msg.getDeleteChunk().getPipeline()); + Preconditions.checkNotNull(pipeline); + if (!this.containerManager.isOpen(pipeline.getContainerName())) { + throw new StorageContainerException("Write to closed container.", + CLOSED_CONTAINER_IO); + } + ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getDeleteChunk() + .getChunkData()); + Preconditions.checkNotNull(chunkInfo); + + this.containerManager.getChunkManager().deleteChunk(pipeline, keyName, + chunkInfo); + return ChunkUtils.getChunkResponse(msg); + } + + /** + * Put Key handler. + * + * @param msg - Request. + * @return - Response. + * @throws IOException + */ + private ContainerCommandResponseProto handlePutKey( + ContainerCommandRequestProto msg) throws IOException { + if (!msg.hasPutKey()) { + LOG.debug("Malformed put key request. trace ID: {}", + msg.getTraceID()); + return ContainerUtils.malformedRequest(msg); + } + Pipeline pipeline = Pipeline.getFromProtoBuf(msg.getPutKey().getPipeline()); + Preconditions.checkNotNull(pipeline); + if (!this.containerManager.isOpen(pipeline.getContainerName())) { + throw new StorageContainerException("Write to closed container.", + CLOSED_CONTAINER_IO); + } + KeyData keyData = KeyData.getFromProtoBuf(msg.getPutKey().getKeyData()); + Preconditions.checkNotNull(keyData); + this.containerManager.getKeyManager().putKey(pipeline, keyData); + long numBytes = keyData.getProtoBufMessage().toByteArray().length; + metrics.incContainerBytesStats(Type.PutKey, numBytes); + return KeyUtils.getKeyResponse(msg); + } + + /** + * Handle Get Key. + * + * @param msg - Request. + * @return - Response. + * @throws IOException + */ + private ContainerCommandResponseProto handleGetKey( + ContainerCommandRequestProto msg) throws IOException { + if (!msg.hasGetKey()) { + LOG.debug("Malformed get key request. trace ID: {}", + msg.getTraceID()); + return ContainerUtils.malformedRequest(msg); + } + KeyData keyData = KeyData.getFromProtoBuf(msg.getGetKey().getKeyData()); + Preconditions.checkNotNull(keyData); + KeyData responseData = + this.containerManager.getKeyManager().getKey(keyData); + long numBytes = responseData.getProtoBufMessage().toByteArray().length; + metrics.incContainerBytesStats(Type.GetKey, numBytes); + return KeyUtils.getKeyDataResponse(msg, responseData); + } + + /** + * Handle Delete Key. + * + * @param msg - Request. + * @return - Response. + * @throws IOException + */ + private ContainerCommandResponseProto handleDeleteKey( + ContainerCommandRequestProto msg) throws IOException { + if (!msg.hasDeleteKey()) { + LOG.debug("Malformed delete key request. trace ID: {}", + msg.getTraceID()); + return ContainerUtils.malformedRequest(msg); + } + Pipeline pipeline = + Pipeline.getFromProtoBuf(msg.getDeleteKey().getPipeline()); + Preconditions.checkNotNull(pipeline); + if (!this.containerManager.isOpen(pipeline.getContainerName())) { + throw new StorageContainerException("Write to closed container.", + CLOSED_CONTAINER_IO); + } + String keyName = msg.getDeleteKey().getName(); + Preconditions.checkNotNull(keyName); + Preconditions.checkState(!keyName.isEmpty()); + this.containerManager.getKeyManager().deleteKey(pipeline, keyName); + return KeyUtils.getKeyResponse(msg); + } + + /** + * Handles writing a chunk and associated key using single RPC. + * + * @param msg - Message. + * @return ContainerCommandResponseProto + * @throws StorageContainerException + */ + private ContainerCommandResponseProto handlePutSmallFile( + ContainerCommandRequestProto msg) throws StorageContainerException { + + if (!msg.hasPutSmallFile()) { + LOG.debug("Malformed put small file request. trace ID: {}", + msg.getTraceID()); + return ContainerUtils.malformedRequest(msg); + } + try { + + Pipeline pipeline = + Pipeline.getFromProtoBuf(msg.getPutSmallFile() + .getKey().getPipeline()); + + Preconditions.checkNotNull(pipeline); + if (!this.containerManager.isOpen(pipeline.getContainerName())) { + throw new StorageContainerException("Write to closed container.", + CLOSED_CONTAINER_IO); + } + KeyData keyData = KeyData.getFromProtoBuf(msg.getPutSmallFile().getKey() + .getKeyData()); + ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getPutSmallFile() + .getChunkInfo()); + byte[] data = msg.getPutSmallFile().getData().toByteArray(); + + metrics.incContainerBytesStats(Type.PutSmallFile, data.length); + this.containerManager.getChunkManager().writeChunk(pipeline, keyData + .getKeyName(), chunkInfo, data, ContainerProtos.Stage.COMBINED); + List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>(); + chunks.add(chunkInfo.getProtoBufMessage()); + keyData.setChunks(chunks); + this.containerManager.getKeyManager().putKey(pipeline, keyData); + return FileUtils.getPutFileResponse(msg); + } catch (StorageContainerException e) { + return ContainerUtils.logAndReturnError(LOG, e, msg); + } catch (IOException e) { + throw new StorageContainerException("Put Small File Failed.", e, + PUT_SMALL_FILE_ERROR); + } + } + + /** + * Handles getting a data stream using a key. This helps in reducing the RPC + * overhead for small files. + * + * @param msg - ContainerCommandRequestProto + * @return ContainerCommandResponseProto + * @throws StorageContainerException + */ + private ContainerCommandResponseProto handleGetSmallFile( + ContainerCommandRequestProto msg) throws StorageContainerException { + ByteString dataBuf = ByteString.EMPTY; + if (!msg.hasGetSmallFile()) { + LOG.debug("Malformed get small file request. trace ID: {}", + msg.getTraceID()); + return ContainerUtils.malformedRequest(msg); + } + try { + Pipeline pipeline = + Pipeline.getFromProtoBuf(msg.getGetSmallFile() + .getKey().getPipeline()); + + long bytes = 0; + Preconditions.checkNotNull(pipeline); + KeyData keyData = KeyData.getFromProtoBuf(msg.getGetSmallFile() + .getKey().getKeyData()); + KeyData data = this.containerManager.getKeyManager().getKey(keyData); + ContainerProtos.ChunkInfo c = null; + for (ContainerProtos.ChunkInfo chunk : data.getChunks()) { + bytes += chunk.getSerializedSize(); + ByteString current = + ByteString.copyFrom(this.containerManager.getChunkManager() + .readChunk( + pipeline, keyData.getKeyName(), ChunkInfo.getFromProtoBuf( + chunk))); + dataBuf = dataBuf.concat(current); + c = chunk; + } + metrics.incContainerBytesStats(Type.GetSmallFile, bytes); + return FileUtils.getGetSmallFileResponse(msg, dataBuf.toByteArray(), + ChunkInfo.getFromProtoBuf(c)); + } catch (StorageContainerException e) { + return ContainerUtils.logAndReturnError(LOG, e, msg); + } catch (IOException e) { + throw new StorageContainerException("Get Small File Failed", e, + GET_SMALL_FILE_ERROR); + } + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java new file mode 100644 index 0000000..cf6bf12 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.impl; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers + .StorageContainerException; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos; +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.container.common.helpers.KeyData; +import org.apache.hadoop.ozone.container.common.helpers.KeyUtils; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; +import org.apache.hadoop.ozone.container.common.interfaces.KeyManager; +import org.apache.hadoop.ozone.container.common.utils.ContainerCache; +import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter; +import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter; +import org.apache.hadoop.utils.MetadataStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .NO_SUCH_KEY; + +/** + * Key Manager impl. + */ +public class KeyManagerImpl implements KeyManager { + static final Logger LOG = + LoggerFactory.getLogger(KeyManagerImpl.class); + + private static final float LOAD_FACTOR = 0.75f; + private final ContainerManager containerManager; + private final Configuration conf; + + /** + * Constructs a key Manager. + * + * @param containerManager - Container Manager. + */ + public KeyManagerImpl(ContainerManager containerManager, Configuration conf) { + Preconditions.checkNotNull(containerManager, "Container manager cannot be" + + " null"); + Preconditions.checkNotNull(conf, "Config cannot be null"); + this.containerManager = containerManager; + this.conf = conf; + } + + /** + * {@inheritDoc} + */ + @Override + public void putKey(Pipeline pipeline, KeyData data) throws IOException { + containerManager.readLock(); + try { + // We are not locking the key manager since LevelDb serializes all actions + // against a single DB. We rely on DB level locking to avoid conflicts. + Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); + String containerName = pipeline.getContainerName(); + Preconditions.checkNotNull(containerName, + "Container name cannot be null"); + ContainerData cData = containerManager.readContainer(containerName); + MetadataStore db = KeyUtils.getDB(cData, conf); + + // This is a post condition that acts as a hint to the user. + // Should never fail. + Preconditions.checkNotNull(db, "DB cannot be null here"); + db.put(data.getKeyName().getBytes(KeyUtils.ENCODING), data + .getProtoBufMessage().toByteArray()); + } finally { + containerManager.readUnlock(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public KeyData getKey(KeyData data) throws IOException { + containerManager.readLock(); + try { + Preconditions.checkNotNull(data, "Key data cannot be null"); + Preconditions.checkNotNull(data.getContainerName(), + "Container name cannot be null"); + ContainerData cData = containerManager.readContainer(data + .getContainerName()); + MetadataStore db = KeyUtils.getDB(cData, conf); + + // This is a post condition that acts as a hint to the user. + // Should never fail. + Preconditions.checkNotNull(db, "DB cannot be null here"); + + byte[] kData = db.get(data.getKeyName().getBytes(KeyUtils.ENCODING)); + if (kData == null) { + throw new StorageContainerException("Unable to find the key.", + NO_SUCH_KEY); + } + ContainerProtos.KeyData keyData = + ContainerProtos.KeyData.parseFrom(kData); + return KeyData.getFromProtoBuf(keyData); + } finally { + containerManager.readUnlock(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void deleteKey(Pipeline pipeline, String keyName) + throws IOException { + containerManager.readLock(); + try { + Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); + String containerName = pipeline.getContainerName(); + Preconditions.checkNotNull(containerName, + "Container name cannot be null"); + ContainerData cData = containerManager.readContainer(containerName); + MetadataStore db = KeyUtils.getDB(cData, conf); + + // This is a post condition that acts as a hint to the user. + // Should never fail. + Preconditions.checkNotNull(db, "DB cannot be null here"); + // Note : There is a race condition here, since get and delete + // are not atomic. Leaving it here since the impact is refusing + // to delete a key which might have just gotten inserted after + // the get check. + + byte[] kData = db.get(keyName.getBytes(KeyUtils.ENCODING)); + if (kData == null) { + throw new StorageContainerException("Unable to find the key.", + NO_SUCH_KEY); + } + db.delete(keyName.getBytes(KeyUtils.ENCODING)); + } finally { + containerManager.readUnlock(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public List<KeyData> listKey( + Pipeline pipeline, String prefix, String startKey, int count) + throws IOException { + Preconditions.checkNotNull(pipeline, + "Pipeline cannot be null."); + Preconditions.checkArgument(count > 0, + "Count must be a positive number."); + ContainerData cData = containerManager.readContainer(pipeline + .getContainerName()); + MetadataStore db = KeyUtils.getDB(cData, conf); + + List<KeyData> result = new ArrayList<KeyData>(); + byte[] startKeyInBytes = startKey == null ? null : + DFSUtil.string2Bytes(startKey); + MetadataKeyFilter prefixFilter = new KeyPrefixFilter(prefix); + List<Map.Entry<byte[], byte[]>> range = + db.getSequentialRangeKVs(startKeyInBytes, count, prefixFilter); + for (Map.Entry<byte[], byte[]> entry : range) { + String keyName = KeyUtils.getKeyName(entry.getKey()); + KeyData value = KeyUtils.getKeyData(entry.getValue()); + KeyData data = new KeyData(value.getContainerName(), keyName); + result.add(data); + } + return result; + } + + /** + * Shutdown keyManager. + */ + @Override + public void shutdown() { + Preconditions.checkState(this.containerManager.hasWriteLock(), "asserts " + + "that we are holding the container manager lock when shutting down."); + KeyUtils.shutdownCache(ContainerCache.getInstance(conf)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java new file mode 100644 index 0000000..3e267d2 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.impl; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.scm.container.common.helpers + .StorageContainerException; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.container.common.interfaces + .ContainerDeletionChoosingPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Randomly choosing containers for block deletion. + */ +public class RandomContainerDeletionChoosingPolicy + implements ContainerDeletionChoosingPolicy { + private static final Logger LOG = + LoggerFactory.getLogger(RandomContainerDeletionChoosingPolicy.class); + + @Override + public List<ContainerData> chooseContainerForBlockDeletion(int count, + Map<String, ContainerStatus> candidateContainers) + throws StorageContainerException { + Preconditions.checkNotNull(candidateContainers, + "Internal assertion: candidate containers cannot be null"); + + int currentCount = 0; + List<ContainerData> result = new LinkedList<>(); + ContainerStatus[] values = new ContainerStatus[candidateContainers.size()]; + // to get a shuffle list + for (ContainerStatus entry : DFSUtil.shuffle( + candidateContainers.values().toArray(values))) { + if (currentCount < count) { + result.add(entry.getContainer()); + currentCount++; + + LOG.debug("Select container {} for block deletion, " + + "pending deletion blocks num: {}.", + entry.getContainer().getContainerName(), + entry.getNumPendingDeletionBlocks()); + } else { + break; + } + } + + return result; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java new file mode 100644 index 0000000..7ef91a9 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.impl; + +/** + * Storage location stats of datanodes that provide back store for containers. + * + */ +public class StorageLocationReport { + public static final StorageLocationReport[] EMPTY_ARRAY = {}; + + private final String id; + private final boolean failed; + private final long capacity; + private final long scmUsed; + private final long remaining; + + public StorageLocationReport(String id, boolean failed, + long capacity, long scmUsed, long remaining) { + this.id = id; + this.failed = failed; + this.capacity = capacity; + this.scmUsed = scmUsed; + this.remaining = remaining; + } + + public String getId() { + return id; + } + + public boolean isFailed() { + return failed; + } + + public long getCapacity() { + return capacity; + } + + public long getScmUsed() { + return scmUsed; + } + + public long getRemaining() { + return remaining; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java new file mode 100644 index 0000000..0169a96 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.impl; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.scm.container.common.helpers + .StorageContainerException; +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.container.common.interfaces + .ContainerDeletionChoosingPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * TopN Ordered choosing policy that choosing containers based on pending + * deletion blocks' number. + */ +public class TopNOrderedContainerDeletionChoosingPolicy + implements ContainerDeletionChoosingPolicy { + private static final Logger LOG = + LoggerFactory.getLogger(TopNOrderedContainerDeletionChoosingPolicy.class); + + /** customized comparator used to compare differentiate container status. **/ + private static final Comparator<ContainerStatus> CONTAINER_STATUS_COMPARATOR + = new Comparator<ContainerStatus>() { + @Override + public int compare(ContainerStatus c1, ContainerStatus c2) { + return Integer.compare(c2.getNumPendingDeletionBlocks(), + c1.getNumPendingDeletionBlocks()); + } + }; + + @Override + public List<ContainerData> chooseContainerForBlockDeletion(int count, + Map<String, ContainerStatus> candidateContainers) + throws StorageContainerException { + Preconditions.checkNotNull(candidateContainers, + "Internal assertion: candidate containers cannot be null"); + + List<ContainerData> result = new LinkedList<>(); + List<ContainerStatus> orderedList = new LinkedList<>(); + orderedList.addAll(candidateContainers.values()); + Collections.sort(orderedList, CONTAINER_STATUS_COMPARATOR); + + // get top N list ordered by pending deletion blocks' number + int currentCount = 0; + for (ContainerStatus entry : orderedList) { + if (currentCount < count) { + if (entry.getNumPendingDeletionBlocks() > 0) { + result.add(entry.getContainer()); + currentCount++; + + LOG.debug( + "Select container {} for block deletion, " + + "pending deletion blocks num: {}.", + entry.getContainer().getContainerName(), + entry.getNumPendingDeletionBlocks()); + } else { + LOG.debug("Stop looking for next container, there is no" + + " pending deletion block contained in remaining containers."); + break; + } + } else { + break; + } + } + + return result; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/package-info.java new file mode 100644 index 0000000..16da5d9 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.impl; + +/** + This package is contains Ozone container implementation. +**/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java new file mode 100644 index 0000000..f55d74c --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.interfaces; + +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers + .StorageContainerException; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; + +/** + * Chunk Manager allows read, write, delete and listing of chunks in + * a container. + */ +public interface ChunkManager { + + /** + * writes a given chunk. + * @param pipeline - Name and the set of machines that make this container. + * @param keyName - Name of the Key. + * @param info - ChunkInfo. + * @param stage - Chunk Stage write. + * @throws StorageContainerException + */ + void writeChunk(Pipeline pipeline, String keyName, + ChunkInfo info, byte[] data, ContainerProtos.Stage stage) + throws StorageContainerException; + + /** + * reads the data defined by a chunk. + * @param pipeline - container pipeline. + * @param keyName - Name of the Key + * @param info - ChunkInfo. + * @return byte array + * @throws StorageContainerException + * + * TODO: Right now we do not support partial reads and writes of chunks. + * TODO: Explore if we need to do that for ozone. + */ + byte[] readChunk(Pipeline pipeline, String keyName, ChunkInfo info) throws + StorageContainerException; + + /** + * Deletes a given chunk. + * @param pipeline - Pipeline. + * @param keyName - Key Name + * @param info - Chunk Info + * @throws StorageContainerException + */ + void deleteChunk(Pipeline pipeline, String keyName, ChunkInfo info) throws + StorageContainerException; + + // TODO : Support list operations. + + /** + * Shutdown the chunkManager. + */ + void shutdown(); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicy.java new file mode 100644 index 0000000..f7280e2 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicy.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.interfaces; + +import org.apache.hadoop.hdds.scm.container.common.helpers + .StorageContainerException; +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.container.common.impl.ContainerStatus; + +import java.util.List; +import java.util.Map; + +/** + * This interface is used for choosing desired containers for + * block deletion. + */ +public interface ContainerDeletionChoosingPolicy { + + /** + * Chooses desired containers for block deletion. + * @param count + * how many to return + * @param candidateContainers + * candidate containers collection + * @return container data list + * @throws StorageContainerException + */ + List<ContainerData> chooseContainerForBlockDeletion(int count, + Map<String, ContainerStatus> candidateContainers) + throws StorageContainerException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java new file mode 100644 index 0000000..984fe41 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.interfaces; + +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos + .ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos + .ContainerCommandResponseProto; + +/** + * Dispatcher acts as the bridge between the transport layer and + * the actual container layer. This layer is capable of transforming + * protobuf objects into corresponding class and issue the function call + * into the lower layers. + * + * The reply from the request is dispatched to the client. + */ +public interface ContainerDispatcher { + /** + * Dispatches commands to container layer. + * @param msg - Command Request + * @return Command Response + */ + ContainerCommandResponseProto dispatch(ContainerCommandRequestProto msg); + + /** + * Initialize the Dispatcher. + */ + void init(); + + /** + * Shutdown Dispatcher services. + */ + void shutdown(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManager.java new file mode 100644 index 0000000..9c5fcea --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManager.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.interfaces; + +import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport; + +import java.io.IOException; +import java.nio.file.Path; + +/** + * Returns physical path locations, where the containers will be created. + */ +public interface ContainerLocationManager { + /** + * Returns the path where the container should be placed from a set of + * locations. + * + * @return A path where we should place this container and metadata. + * @throws IOException + */ + Path getContainerPath() throws IOException; + + /** + * Returns the path where the container Data file are stored. + * + * @return a path where we place the LevelDB and data files of a container. + * @throws IOException + */ + Path getDataPath(String containerName) throws IOException; + + /** + * Returns an array of storage location usage report. + * @return storage location usage report. + */ + StorageLocationReport[] getLocationReport() throws IOException; + + /** + * Supports clean shutdown of container. + * + * @throws IOException + */ + void shutdown() throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManagerMXBean.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManagerMXBean.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManagerMXBean.java new file mode 100644 index 0000000..88e6148 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManagerMXBean.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.interfaces; + +import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport; + +import java.io.IOException; + +/** + * Returns physical path locations, where the containers will be created. + */ +public interface ContainerLocationManagerMXBean { + + /** + * Returns an array of storage location usage report. + * + * @return storage location usage report. + */ + StorageLocationReport[] getLocationReport() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java new file mode 100644 index 0000000..2ff636e --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java @@ -0,0 +1,280 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.interfaces; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers + .StorageContainerException; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdfs.util.RwLock; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReportState; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; + +import java.io.IOException; +import java.security.NoSuchAlgorithmException; +import java.util.List; + +/** + * Interface for container operations. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface ContainerManager extends RwLock { + + /** + * Init call that sets up a container Manager. + * + * @param config - Configuration. + * @param containerDirs - List of Metadata Container locations. + * @param datanodeDetails - DatanodeDetails + * @throws StorageContainerException + */ + void init(Configuration config, List<StorageLocation> containerDirs, + DatanodeDetails datanodeDetails) throws IOException; + + /** + * Creates a container with the given name. + * + * @param pipeline -- Nodes which make up this container. + * @param containerData - Container Name and metadata. + * @throws StorageContainerException + */ + void createContainer(Pipeline pipeline, ContainerData containerData) + throws StorageContainerException; + + /** + * Deletes an existing container. + * + * @param pipeline - nodes that make this container. + * @param containerName - name of the container. + * @param forceDelete - whether this container should be deleted forcibly. + * @throws StorageContainerException + */ + void deleteContainer(Pipeline pipeline, String containerName, + boolean forceDelete) throws StorageContainerException; + + /** + * Update an existing container. + * + * @param pipeline container nodes + * @param containerName name of the container + * @param data container data + * @param forceUpdate if true, update container forcibly. + * @throws StorageContainerException + */ + void updateContainer(Pipeline pipeline, String containerName, + ContainerData data, boolean forceUpdate) throws StorageContainerException; + + /** + * As simple interface for container Iterations. + * + * @param prefix - Return only values matching this prefix + * @param count - how many to return + * @param prevKey - Previous key - Server returns results from this point. + * @param data - Actual containerData + * @throws StorageContainerException + */ + void listContainer(String prefix, long count, String prevKey, + List<ContainerData> data) + throws StorageContainerException; + + /** + * Choose containers for block deletion. + * + * @param count - how many to return + * @throws StorageContainerException + */ + List<ContainerData> chooseContainerForBlockDeletion(int count) + throws StorageContainerException; + + /** + * Get metadata about a specific container. + * + * @param containerName - Name of the container + * @return ContainerData - Container Data. + * @throws StorageContainerException + */ + ContainerData readContainer(String containerName) + throws StorageContainerException; + + /** + * Closes a open container, if it is already closed or does not exist a + * StorageContainerException is thrown. + * @param containerName - Name of the container. + * @throws StorageContainerException + */ + void closeContainer(String containerName) + throws StorageContainerException, NoSuchAlgorithmException; + + /** + * Checks if a container exists. + * @param containerName - Name of the container. + * @return true if the container is open false otherwise. + * @throws StorageContainerException - Throws Exception if we are not + * able to find the container. + */ + boolean isOpen(String containerName) throws StorageContainerException; + + /** + * Supports clean shutdown of container. + * + * @throws StorageContainerException + */ + void shutdown() throws IOException; + + /** + * Sets the Chunk Manager. + * + * @param chunkManager - ChunkManager. + */ + void setChunkManager(ChunkManager chunkManager); + + /** + * Gets the Chunk Manager. + * + * @return ChunkManager. + */ + ChunkManager getChunkManager(); + + /** + * Sets the Key Manager. + * + * @param keyManager - Key Manager. + */ + void setKeyManager(KeyManager keyManager); + + /** + * Gets the Key Manager. + * + * @return KeyManager. + */ + KeyManager getKeyManager(); + + /** + * Get the Node Report of container storage usage. + * @return node report. + */ + SCMNodeReport getNodeReport() throws IOException; + + /** + * Gets container report. + * @return container report. + * @throws IOException + */ + ContainerReportsRequestProto getContainerReport() throws IOException; + + /** + * Gets container reports. + * @return List of all closed containers. + * @throws IOException + */ + List<ContainerData> getContainerReports() throws IOException; + + /** + * Increase pending deletion blocks count number of specified container. + * + * @param numBlocks + * increment count number + * @param containerId + * container id + */ + void incrPendingDeletionBlocks(int numBlocks, String containerId); + + /** + * Decrease pending deletion blocks count number of specified container. + * + * @param numBlocks + * decrement count number + * @param containerId + * container id + */ + void decrPendingDeletionBlocks(int numBlocks, String containerId); + + /** + * Increase the read count of the container. + * @param containerName - Name of the container. + */ + void incrReadCount(String containerName); + + /** + * Increse the read counter for bytes read from the container. + * @param containerName - Name of the container. + * @param readBytes - bytes read from the container. + */ + void incrReadBytes(String containerName, long readBytes); + + + /** + * Increase the write count of the container. + * @param containerName - Name of the container. + */ + void incrWriteCount(String containerName); + + /** + * Increase the write counter for bytes write into the container. + * @param containerName - Name of the container. + * @param writeBytes - bytes write into the container. + */ + void incrWriteBytes(String containerName, long writeBytes); + + /** + * Increase the bytes used by the container. + * @param containerName - Name of the container. + * @param used - additional bytes used by the container. + * @return the current bytes used. + */ + long incrBytesUsed(String containerName, long used); + + /** + * Decrease the bytes used by the container. + * @param containerName - Name of the container. + * @param used - additional bytes reclaimed by the container. + * @return the current bytes used. + */ + long decrBytesUsed(String containerName, long used); + + /** + * Get the bytes used by the container. + * @param containerName - Name of the container. + * @return the current bytes used by the container. + */ + long getBytesUsed(String containerName); + + /** + * Get the number of keys in the container. + * @param containerName - Name of the container. + * @return the current key count. + */ + long getNumKeys(String containerName); + + /** + * Get the container report state to send via HB to SCM. + * @return container report state. + */ + ReportState getContainerReportState(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerReportManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerReportManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerReportManager.java new file mode 100644 index 0000000..4689dfe --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerReportManager.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.interfaces; + +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState; + +/** + * Interface for container report manager operations. + */ +public interface ContainerReportManager { + + /** + * Get the container report state. + * @return the container report state. + */ + ReportState getContainerReportState(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java new file mode 100644 index 0000000..8c27ba9 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.interfaces; + +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers + .StorageContainerException; +import org.apache.hadoop.ozone.container.common.helpers.KeyData; + +import java.io.IOException; +import java.util.List; + +/** + * KeyManager deals with Key Operations in the container Level. + */ +public interface KeyManager { + /** + * Puts or overwrites a key. + * + * @param pipeline - Pipeline. + * @param data - Key Data. + * @throws IOException + */ + void putKey(Pipeline pipeline, KeyData data) throws IOException; + + /** + * Gets an existing key. + * + * @param data - Key Data. + * @return Key Data. + * @throws IOException + */ + KeyData getKey(KeyData data) throws IOException; + + /** + * Deletes an existing Key. + * + * @param pipeline - Pipeline. + * @param keyName Key Data. + * @throws StorageContainerException + */ + void deleteKey(Pipeline pipeline, String keyName) + throws IOException; + + /** + * List keys in a container. + * + * @param pipeline - pipeline. + * @param prefix - Prefix in needed. + * @param startKey - Key to start from, EMPTY_STRING to begin. + * @param count - Number of keys to return. + * @return List of Keys that match the criteria. + */ + List<KeyData> listKey(Pipeline pipeline, String prefix, String startKey, + int count) throws IOException; + + /** + * Shutdown keyManager. + */ + void shutdown(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/package-info.java new file mode 100644 index 0000000..d83bf95 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.interfaces; +/** + This package contains common ozone container interfaces. + */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/package-info.java new file mode 100644 index 0000000..1638a36 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/package-info.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common; +/** + Common Container Layer. At this layer the abstractions are: + + 1. Containers - Both data and metadata containers. + 2. Keys - Key/Value pairs that live inside a container. + 3. Chunks - Keys can be composed of many chunks. + + Ozone uses these abstractions to build Volumes, Buckets and Keys. + + **/ \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org