HDFS-10268. Ozone: end-to-end integration for create/get volumes, buckets and keys. Contributed by Chris Nauroth.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fedb22d9 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fedb22d9 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fedb22d9 Branch: refs/heads/HDFS-7240 Commit: fedb22d9b642da94f6cd3fb79239924708ec34eb Parents: b3044db Author: Anu Engineer <aengin...@apache.org> Authored: Thu Apr 7 14:38:54 2016 -0700 Committer: Anu Engineer <aengin...@apache.org> Committed: Thu Apr 7 14:38:54 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hdfs/server/datanode/DataNode.java | 25 +- .../server/datanode/ObjectStoreHandler.java | 60 ++++- .../org/apache/hadoop/ozone/OzoneConsts.java | 2 + .../container/common/helpers/ChunkUtils.java | 3 +- .../ozone/container/common/impl/Dispatcher.java | 6 +- .../common/transport/client/XceiverClient.java | 14 +- .../transport/client/XceiverClientHandler.java | 2 +- .../transport/client/XceiverClientManager.java | 83 ++++++ .../common/transport/client/package-info.java | 24 ++ .../common/transport/server/package-info.java | 24 ++ .../ozone/storage/StorageContainerManager.java | 110 +++++++- .../hadoop/ozone/web/client/OzoneBucket.java | 9 +- .../ozone/web/exceptions/OzoneException.java | 2 +- .../hadoop/ozone/web/request/OzoneQuota.java | 10 + .../ozone/web/storage/ChunkInputStream.java | 193 ++++++++++++++ .../ozone/web/storage/ChunkOutputStream.java | 193 ++++++++++++++ .../web/storage/ContainerProtocolCalls.java | 198 ++++++++++++++ .../web/storage/DistributedStorageHandler.java | 266 +++++++++++++++---- .../web/storage/OzoneContainerTranslation.java | 261 ++++++++++++++++++ .../apache/hadoop/ozone/MiniOzoneCluster.java | 71 +++-- .../ozone/web/TestOzoneRestWithMiniCluster.java | 253 ++++++++++++++++++ 21 files changed, 1695 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index a5d5015..ff71653 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1846,16 +1846,6 @@ public class DataNode extends ReconfigurableBase public void shutdown() { stopMetricsLogger(); - if(this.ozoneEnabled) { - if(ozoneServer != null) { - try { - ozoneServer.stop(); - } catch (Exception e) { - LOG.error("Error is ozone shutdown. ex {}", e.toString()); - } - } - } - if (plugins != null) { for (ServicePlugin p : plugins) { try { @@ -1914,6 +1904,21 @@ public class DataNode extends ReconfigurableBase } } + // Stop the object store handler + if (this.objectStoreHandler != null) { + this.objectStoreHandler.close(); + } + + if(this.ozoneEnabled) { + if(ozoneServer != null) { + try { + ozoneServer.stop(); + } catch (Exception e) { + LOG.error("Error is ozone shutdown. ex {}", e.toString()); + } + } + } + if (pauseMonitor != null) { pauseMonitor.stop(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java index 6413ac0..b8c6a13 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java @@ -17,36 +17,58 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import static com.sun.jersey.api.core.ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS; -import static com.sun.jersey.api.core.ResourceConfig.FEATURE_TRACE; -import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_DEFAULT; -import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY; import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_OBJECTSTORE_TRACE_ENABLED_KEY; import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_OBJECTSTORE_TRACE_ENABLED_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_ADDRESS_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_BIND_HOST_KEY; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_DEFAULT_PORT; +import static com.sun.jersey.api.core.ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS; +import static com.sun.jersey.api.core.ResourceConfig.FEATURE_TRACE; +import java.io.Closeable; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; import com.sun.jersey.api.container.ContainerFactory; import com.sun.jersey.api.core.ApplicationAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolPB; import org.apache.hadoop.ozone.web.handlers.ServiceFilter; import org.apache.hadoop.ozone.web.interfaces.StorageHandler; import org.apache.hadoop.ozone.web.ObjectStoreApplication; import org.apache.hadoop.ozone.web.netty.ObjectStoreJerseyContainer; import org.apache.hadoop.ozone.web.storage.DistributedStorageHandler; import org.apache.hadoop.ozone.web.localstorage.LocalStorageHandler; +import org.apache.hadoop.security.UserGroupInformation; /** * Implements object store handling within the DataNode process. This class is * responsible for initializing and maintaining the RPC clients and servers and * the web application required for the object store implementation. */ -public final class ObjectStoreHandler { +public final class ObjectStoreHandler implements Closeable { + + private static final Logger LOG = + LoggerFactory.getLogger(ObjectStoreJerseyContainer.class); private final ObjectStoreJerseyContainer objectStoreJerseyContainer; + private final StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient; /** * Creates a new ObjectStoreHandler. @@ -57,14 +79,32 @@ public final class ObjectStoreHandler { public ObjectStoreHandler(Configuration conf) throws IOException { String shType = conf.getTrimmed(DFS_STORAGE_HANDLER_TYPE_KEY, DFS_STORAGE_HANDLER_TYPE_DEFAULT); + LOG.info("ObjectStoreHandler initializing with {}: {}", + DFS_STORAGE_HANDLER_TYPE_KEY, shType); boolean ozoneTrace = conf.getBoolean(DFS_OBJECTSTORE_TRACE_ENABLED_KEY, DFS_OBJECTSTORE_TRACE_ENABLED_DEFAULT); final StorageHandler storageHandler; + + // Initialize Jersey container for object store web application. if ("distributed".equalsIgnoreCase(shType)) { - storageHandler = new DistributedStorageHandler(); + RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class, + ProtobufRpcEngine.class); + long version = + RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class); + InetSocketAddress address = conf.getSocketAddr( + DFS_STORAGE_RPC_BIND_HOST_KEY, DFS_STORAGE_RPC_ADDRESS_KEY, + DFS_STORAGE_RPC_ADDRESS_DEFAULT, DFS_STORAGE_RPC_DEFAULT_PORT); + this.storageContainerLocationClient = + new StorageContainerLocationProtocolClientSideTranslatorPB( + RPC.getProxy(StorageContainerLocationProtocolPB.class, version, + address, UserGroupInformation.getCurrentUser(), conf, + NetUtils.getDefaultSocketFactory(conf), Client.getTimeout(conf))); + storageHandler = new DistributedStorageHandler(new OzoneConfiguration(), + this.storageContainerLocationClient); } else { if ("local".equalsIgnoreCase(shType)) { storageHandler = new LocalStorageHandler(conf); + this.storageContainerLocationClient = null; } else { throw new IllegalArgumentException( String.format("Unrecognized value for %s: %s", @@ -91,4 +131,12 @@ public final class ObjectStoreHandler { public ObjectStoreJerseyContainer getObjectStoreJerseyContainer() { return this.objectStoreJerseyContainer; } + + @Override + public void close() { + LOG.info("Closing ObjectStoreHandler."); + if (this.storageContainerLocationClient != null) { + this.storageContainerLocationClient.close(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index bebbb78..1ffaa2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -68,6 +68,8 @@ public final class OzoneConsts { public static final String FILE_HASH = "SHA-256"; public final static String CHUNK_OVERWRITE = "OverWriteRequested"; + public static final int CHUNK_SIZE = 1 * 1024 * 1024; // 1 MB + /** * Supports Bucket Versioning. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java index 15e4524..b4c8aa6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java @@ -169,7 +169,8 @@ public final class ChunkUtils { StandardOpenOption.SPARSE, StandardOpenOption.SYNC); lock = file.lock().get(); - if (!chunkInfo.getChecksum().isEmpty()) { + if (chunkInfo.getChecksum() != null && + !chunkInfo.getChecksum().isEmpty()) { verifyChecksum(chunkInfo, data, log); } int size = file.write(ByteBuffer.wrap(data), chunkInfo.getOffset()).get(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java index 66ff1ba..bad1d23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java @@ -127,7 +127,7 @@ public class Dispatcher implements ContainerDispatcher { msg.getCreateContainer().getContainerData().getName(), msg.getCmdType().name(), msg.getTraceID(), - ex.toString()); + ex.toString(), ex); // TODO : Replace with finer error codes. return ContainerUtils.getContainerResponse(msg, @@ -169,7 +169,7 @@ public class Dispatcher implements ContainerDispatcher { msg.getCreateContainer().getContainerData().getName(), msg.getCmdType().name(), msg.getTraceID(), - ex.toString()); + ex.toString(), ex); // TODO : Replace with finer error codes. return ContainerUtils.getContainerResponse(msg, @@ -210,7 +210,7 @@ public class Dispatcher implements ContainerDispatcher { msg.getCreateContainer().getContainerData().getName(), msg.getCmdType().name(), msg.getTraceID(), - ex.toString()); + ex.toString(), ex); // TODO : Replace with finer error codes. return ContainerUtils.getContainerResponse(msg, http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClient.java index 3b9ba8d..e6d914a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClient.java @@ -34,12 +34,13 @@ import org.apache.hadoop.ozone.container.common.helpers.Pipeline; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.io.IOException; /** * A Client for the storageContainer protocol. */ -public class XceiverClient { +public class XceiverClient implements Closeable { static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class); private final Pipeline pipeline; private final Configuration config; @@ -92,6 +93,7 @@ public class XceiverClient { /** * Close the client. */ + @Override public void close() { if(group != null) { group.shutdownGracefully(); @@ -103,6 +105,16 @@ public class XceiverClient { } /** + * Returns the pipeline of machines that host the container used by this + * client. + * + * @return pipeline of machines that host the container + */ + public Pipeline getPipeline() { + return pipeline; + } + + /** * Sends a given command to server and gets the reply back. * @param request Request * @return Response to the command http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientHandler.java index a219e4e..c9a3ad3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientHandler.java @@ -94,7 +94,7 @@ public class XceiverClientHandler extends ContainerProtos.ContainerCommandResponseProto response; channel.writeAndFlush(request); boolean interrupted = false; - for (; ; ) { + for (;;) { try { response = responses.take(); break; http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientManager.java new file mode 100644 index 0000000..8123ae9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientManager.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.client; + +import java.io.IOException; + +import com.google.common.base.Preconditions; + +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.container.common.helpers.Pipeline; + +/** + * XceiverClientManager is responsible for the lifecycle of XceiverClient + * instances. Callers use this class to acquire an XceiverClient instance + * connected to the desired container pipeline. When done, the caller also uses + * this class to release the previously acquired XceiverClient instance. + * + * This class may evolve to implement efficient lifecycle management policies by + * caching container location information and pooling connected client instances + * for reuse without needing to reestablish a socket connection. The current + * implementation simply allocates and closes a new instance every time. + */ +public class XceiverClientManager { + + private final OzoneConfiguration conf; + + /** + * Creates a new XceiverClientManager. + * + * @param conf configuration + */ + public XceiverClientManager(OzoneConfiguration conf) { + Preconditions.checkNotNull(conf); + this.conf = conf; + } + + /** + * Acquires a XceiverClient connected to a container capable of storing the + * specified key. + * + * @param pipeline the container pipeline for the client connection + * @return XceiverClient connected to a container + * @throws IOException if an XceiverClient cannot be acquired + */ + public XceiverClient acquireClient(Pipeline pipeline) throws IOException { + Preconditions.checkNotNull(pipeline); + Preconditions.checkArgument(pipeline.getMachines() != null); + Preconditions.checkArgument(!pipeline.getMachines().isEmpty()); + XceiverClient xceiverClient = new XceiverClient(pipeline, conf); + try { + xceiverClient.connect(); + } catch (Exception e) { + throw new IOException("Exception connecting XceiverClient.", e); + } + return xceiverClient; + } + + /** + * Releases an XceiverClient after use. + * + * @param xceiverClient client to release + */ + public void releaseClient(XceiverClient xceiverClient) { + Preconditions.checkNotNull(xceiverClient); + xceiverClient.close(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/package-info.java new file mode 100644 index 0000000..d3c0278 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/package-info.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.client; + +/** + * This package contains classes for the client of the storage container + * protocol. + */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java new file mode 100644 index 0000000..59c96f1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server; + +/** + * This package contains classes for the server of the storage container + * protocol. + */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java index 90e200a..16863c9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java @@ -48,10 +48,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.CreateContainerRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -81,6 +86,10 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.container.common.helpers.Pipeline; +import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient; +import org.apache.hadoop.ozone.container.common.transport.client.XceiverClientManager; import org.apache.hadoop.ozone.protocol.LocatedContainer; import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos; @@ -94,11 +103,14 @@ import org.apache.hadoop.util.StringUtils; * * The current implementation is a stub suitable to begin end-to-end testing of * Ozone service interactions. DataNodes report to StorageContainerManager - * using the existing heartbeat messages. StorageContainerManager tells clients - * container locations by reporting that all registered nodes are a viable - * location. This will evolve from a stub to a full-fledged implementation - * capable of partitioning the keyspace across multiple containers, with - * appropriate distribution across nodes. + * using the existing heartbeat messages. StorageContainerManager lazily + * initializes a single storage container to be served by those DataNodes. + * All subsequent requests for container locations will reply with that single + * pipeline, using all registered nodes. + * + * This will evolve from a stub to a full-fledged implementation capable of + * partitioning the keyspace across multiple containers, with appropriate + * distribution across nodes. */ @InterfaceAudience.Private public class StorageContainerManager @@ -109,6 +121,8 @@ public class StorageContainerManager private final StorageContainerNameService ns; private final BlockManager blockManager; + private final XceiverClientManager xceiverClientManager; + private Pipeline singlePipeline; /** The RPC server that listens to requests from DataNodes. */ private final RPC.Server serviceRpcServer; @@ -128,11 +142,12 @@ public class StorageContainerManager * * @param conf configuration */ - public StorageContainerManager(Configuration conf) + public StorageContainerManager(OzoneConfiguration conf) throws IOException { ns = new StorageContainerNameService(); boolean haEnabled = false; blockManager = new BlockManager(ns, haEnabled, conf); + xceiverClientManager = new XceiverClientManager(conf); RPC.setProtocolEngine(conf, DatanodeProtocolPB.class, ProtobufRpcEngine.class); @@ -193,20 +208,20 @@ public class StorageContainerManager public Set<LocatedContainer> getStorageContainerLocations(Set<String> keys) throws IOException { LOG.trace("getStorageContainerLocations keys = {}", keys); + Pipeline pipeline = initSingleContainerPipeline(); List<DatanodeDescriptor> liveNodes = new ArrayList<DatanodeDescriptor>(); blockManager.getDatanodeManager().fetchDatanodes(liveNodes, null, false); if (liveNodes.isEmpty()) { throw new IOException("Storage container locations not found."); } - String containerName = UUID.randomUUID().toString(); Set<DatanodeInfo> locations = Sets.<DatanodeInfo>newLinkedHashSet(liveNodes); DatanodeInfo leader = liveNodes.get(0); Set<LocatedContainer> locatedContainers = Sets.newLinkedHashSetWithExpectedSize(keys.size()); for (String key: keys) { - locatedContainers.add(new LocatedContainer(key, key, containerName, - locations, leader)); + locatedContainers.add(new LocatedContainer(key, key, + pipeline.getContainerName(), locations, leader)); } LOG.trace("getStorageContainerLocations keys = {}, locatedContainers = {}", keys, locatedContainers); @@ -416,6 +431,56 @@ public class StorageContainerManager } /** + * Lazily initializes a single container pipeline using all registered + * DataNodes via a synchronous call to the container protocol. This single + * container pipeline will be reused for container requests for the lifetime + * of this StorageContainerManager. + * + * @throws IOException if there is an I/O error + */ + private synchronized Pipeline initSingleContainerPipeline() + throws IOException { + if (singlePipeline == null) { + List<DatanodeDescriptor> liveNodes = new ArrayList<DatanodeDescriptor>(); + blockManager.getDatanodeManager().fetchDatanodes(liveNodes, null, false); + if (liveNodes.isEmpty()) { + throw new IOException("Storage container locations not found."); + } + Pipeline newPipeline = newPipelineFromNodes(liveNodes, + UUID.randomUUID().toString()); + XceiverClient xceiverClient = + xceiverClientManager.acquireClient(newPipeline); + try { + ContainerData containerData = ContainerData + .newBuilder() + .setName(newPipeline.getContainerName()) + .build(); + CreateContainerRequestProto createContainerRequest = + CreateContainerRequestProto.newBuilder() + .setPipeline(newPipeline.getProtobufMessage()) + .setContainerData(containerData) + .build(); + ContainerCommandRequestProto request = ContainerCommandRequestProto + .newBuilder() + .setCmdType(Type.CreateContainer) + .setCreateContainer(createContainerRequest) + .build(); + ContainerCommandResponseProto response = xceiverClient.sendCommand( + request); + Result result = response.getResult(); + if (result != Result.SUCCESS) { + throw new IOException( + "Failed to initialize container due to result code: " + result); + } + singlePipeline = newPipeline; + } finally { + xceiverClientManager.releaseClient(xceiverClient); + } + } + return singlePipeline; + } + + /** * Builds a message for logging startup information about an RPC server. * * @param description RPC server description @@ -430,6 +495,25 @@ public class StorageContainerManager } /** + * Translates a list of nodes, ordered such that the first is the leader, into + * a corresponding {@link Pipeline} object. + * + * @param nodes list of nodes + * @param containerName container name + * @return pipeline corresponding to nodes + */ + private static Pipeline newPipelineFromNodes(List<DatanodeDescriptor> nodes, + String containerName) { + String leaderId = nodes.get(0).getDatanodeUuid(); + Pipeline pipeline = new Pipeline(leaderId); + for (DatanodeDescriptor node : nodes) { + pipeline.addMember(node); + } + pipeline.setContainerName(containerName); + return pipeline; + } + + /** * Starts an RPC server, if configured. * * @param conf configuration @@ -443,7 +527,7 @@ public class StorageContainerManager * @return RPC server, or null if addr is null * @throws IOException if there is an I/O error while creating RPC server */ - private static RPC.Server startRpcServer(Configuration conf, + private static RPC.Server startRpcServer(OzoneConfiguration conf, InetSocketAddress addr, Class<?> protocol, BlockingService instance, String bindHostKey, String handlerCountKey, int handlerCountDefault) throws IOException { @@ -480,7 +564,7 @@ public class StorageContainerManager * @param rpcServer started RPC server. If null, then the server was not * started, and this method is a no-op. */ - private static InetSocketAddress updateListenAddress(Configuration conf, + private static InetSocketAddress updateListenAddress(OzoneConfiguration conf, String rpcAddressKey, InetSocketAddress addr, RPC.Server rpcServer) { if (rpcServer == null) { return null; @@ -502,7 +586,7 @@ public class StorageContainerManager StringUtils.startupShutdownMessage( StorageContainerManager.class, argv, LOG); StorageContainerManager scm = new StorageContainerManager( - new Configuration()); + new OzoneConfiguration()); scm.start(); scm.join(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneBucket.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneBucket.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneBucket.java index 4df303a..3441bf9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneBucket.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneBucket.java @@ -174,9 +174,12 @@ public class OzoneBucket { InputStream is = new ByteArrayInputStream(data.getBytes(ENCODING)); putRequest.setEntity(new InputStreamEntity(is, data.length())); - putRequest.setHeader(Header.CONTENT_MD5, DigestUtils.md5Hex(is)); - putRequest - .setHeader(HttpHeaders.CONTENT_LENGTH, Long.toString(data.length())); + is.mark(data.length()); + try { + putRequest.setHeader(Header.CONTENT_MD5, DigestUtils.md5Hex(is)); + } finally { + is.reset(); + } executePutKey(putRequest, httpClient); } catch (IOException | URISyntaxException ex) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/exceptions/OzoneException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/exceptions/OzoneException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/exceptions/OzoneException.java index 2bd4a69..c2e64da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/exceptions/OzoneException.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/exceptions/OzoneException.java @@ -92,7 +92,7 @@ public class OzoneException extends Exception { */ public OzoneException(long httpCode, String shortMessage, String message) { this.shortMessage = shortMessage; - this.resource = message; + this.message = message; this.httpCode = httpCode; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneQuota.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneQuota.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneQuota.java index ca6ddeb..501b239 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneQuota.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneQuota.java @@ -78,6 +78,16 @@ public class OzoneQuota { } /** + * Formats a quota as a string. + * + * @param quota the quota to format + * @return string representation of quota + */ + public static String formatQuota(OzoneQuota quota) { + return String.valueOf(quota.size) + quota.unit; + } + + /** * Parses a user provided string and returns the * Quota Object. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkInputStream.java new file mode 100644 index 0000000..166e71c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkInputStream.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.web.storage; + +import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; + +import com.google.protobuf.ByteString; + +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient; +import org.apache.hadoop.ozone.container.common.transport.client.XceiverClientManager; +import org.apache.hadoop.ozone.web.exceptions.OzoneException; +import org.apache.hadoop.ozone.web.handlers.UserArgs; + +/** + * An {@link InputStream} used by the REST service in combination with the + * {@link DistributedStorageHandler} to read the value of a key from a sequence + * of container chunks. All bytes of the key value are stored in container + * chunks. Each chunk may contain multiple underlying {@link ByteBuffer} + * instances. This class encapsulates all state management for iterating + * through the sequence of chunks and the sequence of buffers within each chunk. + */ +class ChunkInputStream extends InputStream { + + private static final int EOF = -1; + + private final String key; + private final UserArgs args; + private XceiverClientManager xceiverClientManager; + private XceiverClient xceiverClient; + private List<ChunkInfo> chunks; + private int chunkOffset; + private List<ByteBuffer> buffers; + private int bufferOffset; + + /** + * Creates a new ChunkInputStream. + * + * @param key chunk key + * @param xceiverClientManager client manager that controls client + * @param xceiverClient client to perform container calls + * @param chunks list of chunks to read + * @param args container protocol call args + */ + public ChunkInputStream(String key, XceiverClientManager xceiverClientManager, + XceiverClient xceiverClient, List<ChunkInfo> chunks, UserArgs args) { + this.key = key; + this.args = args; + this.xceiverClientManager = xceiverClientManager; + this.xceiverClient = xceiverClient; + this.chunks = chunks; + this.chunkOffset = 0; + this.buffers = null; + this.bufferOffset = 0; + } + + @Override + public synchronized int read() + throws IOException { + checkOpen(); + int available = prepareRead(1); + return available == EOF ? EOF : buffers.get(bufferOffset).get(); + } + + @Override + public synchronized int read(byte[] b, int off, int len) throws IOException { + // According to the JavaDocs for InputStream, it is recommended that + // subclasses provide an override of bulk read if possible for performance + // reasons. In addition to performance, we need to do it for correctness + // reasons. The Ozone REST service uses PipedInputStream and + // PipedOutputStream to relay HTTP response data between a Jersey thread and + // a Netty thread. It turns out that PipedInputStream/PipedOutputStream + // have a subtle dependency (bug?) on the wrapped stream providing separate + // implementations of single-byte read and bulk read. Without this, get key + // responses might close the connection before writing all of the bytes + // advertised in the Content-Length. + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + if (len == 0) { + return 0; + } + checkOpen(); + int available = prepareRead(len); + if (available == EOF) { + return EOF; + } + buffers.get(bufferOffset).get(b, off, available); + return available; + } + + @Override + public synchronized void close() { + if (xceiverClientManager != null && xceiverClient != null) { + xceiverClientManager.releaseClient(xceiverClient); + xceiverClientManager = null; + xceiverClient = null; + } + } + + /** + * Checks if the stream is open. If not, throws an exception. + * + * @throws IOException if stream is closed + */ + private synchronized void checkOpen() throws IOException { + if (xceiverClient == null) { + throw new IOException("ChunkInputStream has been closed."); + } + } + + /** + * Prepares to read by advancing through chunks and buffers as needed until it + * finds data to return or encounters EOF. + * + * @param len desired length of data to read + * @return length of data available to read, possibly less than desired length + */ + private synchronized int prepareRead(int len) throws IOException { + for (;;) { + if (chunks == null || chunks.isEmpty()) { + // This must be an empty key. + return EOF; + } else if (buffers == null) { + // The first read triggers fetching the first chunk. + readChunkFromContainer(0); + } else if (!buffers.isEmpty() && + buffers.get(bufferOffset).hasRemaining()) { + // Data is available from the current buffer. + ByteBuffer bb = buffers.get(bufferOffset); + return len > bb.remaining() ? bb.remaining() : len; + } else if (!buffers.isEmpty() && + !buffers.get(bufferOffset).hasRemaining() && + bufferOffset < buffers.size() - 1) { + // There are additional buffers available. + ++bufferOffset; + } else if (chunkOffset < chunks.size() - 1) { + // There are additional chunks available. + readChunkFromContainer(chunkOffset + 1); + } else { + // All available input has been consumed. + return EOF; + } + } + } + + /** + * Attempts to read the chunk at the specified offset in the chunk list. If + * successful, then the data of the read chunk is saved so that its bytes can + * be returned from subsequent read calls. + * + * @param readChunkOffset offset in the chunk list of which chunk to read + * @throws IOException if there is an I/O error while performing the call + */ + private synchronized void readChunkFromContainer(int readChunkOffset) + throws IOException { + final ReadChunkResponseProto readChunkResponse; + try { + readChunkResponse = readChunk(xceiverClient, chunks.get(readChunkOffset), + key, args); + } catch (OzoneException e) { + throw new IOException("Unexpected OzoneException", e); + } + chunkOffset = readChunkOffset; + ByteString byteString = readChunkResponse.getData(); + buffers = byteString.asReadOnlyByteBufferList(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkOutputStream.java new file mode 100644 index 0000000..d4e639f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkOutputStream.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.web.storage; + +import static org.apache.hadoop.ozone.OzoneConsts.CHUNK_SIZE; +import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*; +import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.UUID; + +import com.google.protobuf.ByteString; + +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; +import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient; +import org.apache.hadoop.ozone.container.common.transport.client.XceiverClientManager; +import org.apache.hadoop.ozone.web.exceptions.OzoneException; +import org.apache.hadoop.ozone.web.handlers.UserArgs; +import org.apache.hadoop.ozone.web.response.KeyInfo; + +/** + * An {@link OutputStream} used by the REST service in combination with the + * {@link DistributedStorageHandler} to write the value of a key to a sequence + * of container chunks. Writes are buffered locally and periodically written to + * the container as a new chunk. In order to preserve the semantics that + * replacement of a pre-existing key is atomic, each instance of the stream has + * an internal unique identifier. This unique identifier and a monotonically + * increasing chunk index form a composite key that is used as the chunk name. + * After all data is written, a putKey call creates or updates the corresponding + * container key, and this call includes the full list of chunks that make up + * the key data. The list of chunks is updated all at once. Therefore, a + * concurrent reader never can see an intermediate state in which different + * chunks of data from different versions of the key data are interleaved. + * This class encapsulates all state management for buffering and writing + * through to the container. + */ +class ChunkOutputStream extends OutputStream { + + private final String containerKey; + private final KeyInfo key; + private final UserArgs args; + private final KeyData.Builder containerKeyData; + private XceiverClientManager xceiverClientManager; + private XceiverClient xceiverClient; + private ByteBuffer buffer; + private final String streamId; + private int chunkIndex; + + /** + * Creates a new ChunkOutputStream. + * + * @param containerKey container key + * @param key chunk key + * @param xceiverClientManager client manager that controls client + * @param xceiverClient client to perform container calls + * @param args container protocol call args + */ + public ChunkOutputStream(String containerKey, KeyInfo key, + XceiverClientManager xceiverClientManager, XceiverClient xceiverClient, + UserArgs args) { + this.containerKey = containerKey; + this.key = key; + this.args = args; + this.containerKeyData = fromKeyToContainerKeyDataBuilder( + xceiverClient.getPipeline().getContainerName(), containerKey, key); + this.xceiverClientManager = xceiverClientManager; + this.xceiverClient = xceiverClient; + this.buffer = ByteBuffer.allocate(CHUNK_SIZE); + this.streamId = UUID.randomUUID().toString(); + this.chunkIndex = 0; + } + + @Override + public synchronized void write(int b) throws IOException { + checkOpen(); + int rollbackPosition = buffer.position(); + int rollbackLimit = buffer.limit(); + buffer.put((byte)b); + if (buffer.position() == CHUNK_SIZE) { + flushBufferToChunk(rollbackPosition, rollbackLimit); + } + } + + @Override + public synchronized void flush() throws IOException { + checkOpen(); + if (buffer.position() > 0) { + int rollbackPosition = buffer.position(); + int rollbackLimit = buffer.limit(); + flushBufferToChunk(rollbackPosition, rollbackLimit); + } + } + + @Override + public synchronized void close() throws IOException { + if (xceiverClientManager != null && xceiverClient != null && + buffer != null) { + try { + if (buffer.position() > 0) { + writeChunkToContainer(); + } + putKey(xceiverClient, containerKeyData.build(), args); + } catch (OzoneException e) { + throw new IOException("Unexpected OzoneException", e); + } finally { + xceiverClientManager.releaseClient(xceiverClient); + xceiverClientManager = null; + xceiverClient = null; + buffer = null; + } + } + + } + + /** + * Checks if the stream is open. If not, throws an exception. + * + * @throws IOException if stream is closed + */ + private synchronized void checkOpen() throws IOException { + if (xceiverClient == null) { + throw new IOException("ChunkOutputStream has been closed."); + } + } + + /** + * Attempts to flush buffered writes by writing a new chunk to the container. + * If successful, then clears the buffer to prepare to receive writes for a + * new chunk. + * + * @param rollbackPosition position to restore in buffer if write fails + * @param rollbackLimit limit to restore in buffer if write fails + * @throws IOException if there is an I/O error while performing the call + */ + private synchronized void flushBufferToChunk(int rollbackPosition, + int rollbackLimit) throws IOException { + boolean success = false; + try { + writeChunkToContainer(); + success = true; + } finally { + if (success) { + buffer.clear(); + } else { + buffer.position(rollbackPosition); + buffer.limit(rollbackLimit); + } + } + } + + /** + * Writes buffered data as a new chunk to the container and saves chunk + * information to be used later in putKey call. + * + * @throws IOException if there is an I/O error while performing the call + */ + private synchronized void writeChunkToContainer() throws IOException { + buffer.flip(); + ByteString data = ByteString.copyFrom(buffer); + ChunkInfo chunk = ChunkInfo + .newBuilder() + .setChunkName( + key.getKeyName() + "_stream_" + streamId + "_chunk_" + ++chunkIndex) + .setOffset(0) + .setLen(data.size()) + .build(); + try { + writeChunk(xceiverClient, chunk, key.getKeyName(), data, args); + } catch (OzoneException e) { + throw new IOException("Unexpected OzoneException", e); + } + containerKeyData.addChunks(chunk); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ContainerProtocolCalls.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ContainerProtocolCalls.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ContainerProtocolCalls.java new file mode 100644 index 0000000..4cb3ab9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ContainerProtocolCalls.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.web.storage; + +import static java.net.HttpURLConnection.HTTP_BAD_REQUEST; +import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; + +import java.io.IOException; + +import com.google.protobuf.ByteString; + +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.PutKeyRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.WriteChunkRequestProto; +import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient; +import org.apache.hadoop.ozone.web.exceptions.ErrorTable; +import org.apache.hadoop.ozone.web.exceptions.OzoneException; +import org.apache.hadoop.ozone.web.handlers.UserArgs; + +/** + * Implementation of all container protocol calls performed by + * {@link DistributedStorageHandler}. + */ +final class ContainerProtocolCalls { + + /** + * Calls the container protocol to get a container key. + * + * @param xceiverClient client to perform call + * @param containerKeyData key data to identify container + * @param args container protocol call args + * @returns container protocol get key response + * @throws IOException if there is an I/O error while performing the call + * @throws OzoneException if the container protocol call failed + */ + public static GetKeyResponseProto getKey(XceiverClient xceiverClient, + KeyData containerKeyData, UserArgs args) throws IOException, + OzoneException { + GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto + .newBuilder() + .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) + .setKeyData(containerKeyData); + ContainerCommandRequestProto request = ContainerCommandRequestProto + .newBuilder() + .setCmdType(Type.GetKey) + .setTraceID(args.getRequestID()) + .setGetKey(readKeyRequest) + .build(); + ContainerCommandResponseProto response = xceiverClient.sendCommand(request); + validateContainerResponse(response, args); + return response.getGetKey(); + } + + /** + * Calls the container protocol to put a container key. + * + * @param xceiverClient client to perform call + * @param containerKeyData key data to identify container + * @param args container protocol call args + * @throws IOException if there is an I/O error while performing the call + * @throws OzoneException if the container protocol call failed + */ + public static void putKey(XceiverClient xceiverClient, + KeyData containerKeyData, UserArgs args) throws IOException, + OzoneException { + PutKeyRequestProto.Builder createKeyRequest = PutKeyRequestProto + .newBuilder() + .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) + .setKeyData(containerKeyData); + ContainerCommandRequestProto request = ContainerCommandRequestProto + .newBuilder() + .setCmdType(Type.PutKey) + .setTraceID(args.getRequestID()) + .setPutKey(createKeyRequest) + .build(); + ContainerCommandResponseProto response = xceiverClient.sendCommand(request); + validateContainerResponse(response, args); + } + + /** + * Calls the container protocol to read a chunk. + * + * @param xceiverClient client to perform call + * @param chunk information about chunk to read + * @param key the key name + * @param args container protocol call args + * @returns container protocol read chunk response + * @throws IOException if there is an I/O error while performing the call + * @throws OzoneException if the container protocol call failed + */ + public static ReadChunkResponseProto readChunk(XceiverClient xceiverClient, + ChunkInfo chunk, String key, UserArgs args) + throws IOException, OzoneException { + ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto + .newBuilder() + .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) + .setKeyName(key) + .setChunkData(chunk); + ContainerCommandRequestProto request = ContainerCommandRequestProto + .newBuilder() + .setCmdType(Type.ReadChunk) + .setTraceID(args.getRequestID()) + .setReadChunk(readChunkRequest) + .build(); + ContainerCommandResponseProto response = xceiverClient.sendCommand(request); + validateContainerResponse(response, args); + return response.getReadChunk(); + } + + /** + * Calls the container protocol to write a chunk. + * + * @param xceiverClient client to perform call + * @param chunk information about chunk to write + * @param key the key name + * @param data the data of the chunk to write + * @param args container protocol call args + * @throws IOException if there is an I/O error while performing the call + * @throws OzoneException if the container protocol call failed + */ + public static void writeChunk(XceiverClient xceiverClient, ChunkInfo chunk, + String key, ByteString data, UserArgs args) + throws IOException, OzoneException { + WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto + .newBuilder() + .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) + .setKeyName(key) + .setChunkData(chunk) + .setData(data); + ContainerCommandRequestProto request = ContainerCommandRequestProto + .newBuilder() + .setCmdType(Type.WriteChunk) + .setTraceID(args.getRequestID()) + .setWriteChunk(writeChunkRequest) + .build(); + ContainerCommandResponseProto response = xceiverClient.sendCommand(request); + validateContainerResponse(response, args); + } + + /** + * Validates a response from a container protocol call. Any non-successful + * return code is mapped to a corresponding exception and thrown. + * + * @param response container protocol call response + * @param args container protocol call args + * @throws OzoneException if the container protocol call failed + */ + private static void validateContainerResponse( + ContainerCommandResponseProto response, UserArgs args) + throws OzoneException { + switch (response.getResult()) { + case SUCCESS: + break; + case MALFORMED_REQUEST: + throw ErrorTable.newError(new OzoneException(HTTP_BAD_REQUEST, + "badRequest", "Bad container request."), args); + case UNSUPPORTED_REQUEST: + throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR, + "internalServerError", "Unsupported container request."), args); + case CONTAINER_INTERNAL_ERROR: + throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR, + "internalServerError", "Container internal error."), args); + default: + throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR, + "internalServerError", "Unrecognized container response."), args); + } + } + + /** + * There is no need to instantiate this class. + */ + private ContainerProtocolCalls() { + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java index 9cb8430..8d4868c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java @@ -18,7 +18,32 @@ package org.apache.hadoop.ozone.web.storage; +import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*; +import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*; + +import java.io.IOException; +import java.io.OutputStream; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.TimeZone; + +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.helpers.Pipeline; +import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient; +import org.apache.hadoop.ozone.container.common.transport.client.XceiverClientManager; +import org.apache.hadoop.ozone.protocol.LocatedContainer; +import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.ozone.web.exceptions.OzoneException; import org.apache.hadoop.ozone.web.handlers.BucketArgs; import org.apache.hadoop.ozone.web.handlers.KeyArgs; @@ -27,13 +52,13 @@ import org.apache.hadoop.ozone.web.handlers.UserArgs; import org.apache.hadoop.ozone.web.handlers.VolumeArgs; import org.apache.hadoop.ozone.web.interfaces.StorageHandler; import org.apache.hadoop.ozone.web.response.BucketInfo; +import org.apache.hadoop.ozone.web.response.KeyInfo; import org.apache.hadoop.ozone.web.response.ListBuckets; import org.apache.hadoop.ozone.web.response.ListKeys; import org.apache.hadoop.ozone.web.response.ListVolumes; import org.apache.hadoop.ozone.web.response.VolumeInfo; - -import java.io.IOException; -import java.io.OutputStream; +import org.apache.hadoop.ozone.web.response.VolumeOwner; +import org.apache.hadoop.util.StringUtils; /** * A {@link StorageHandler} implementation that distributes object storage @@ -41,156 +66,283 @@ import java.io.OutputStream; */ public final class DistributedStorageHandler implements StorageHandler { - @Override - public void createVolume(VolumeArgs args) throws - IOException, OzoneException { + private final StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocation; + private final XceiverClientManager xceiverClientManager; + + /** + * Creates a new DistributedStorageHandler. + * + * @param conf configuration + * @param storageContainerLocation StorageContainerLocationProtocol proxy + */ + public DistributedStorageHandler(OzoneConfiguration conf, + StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocation) { + this.storageContainerLocation = storageContainerLocation; + this.xceiverClientManager = new XceiverClientManager(conf); + } + @Override + public void createVolume(VolumeArgs args) throws IOException, OzoneException { + String containerKey = buildContainerKey(args.getVolumeName()); + XceiverClient xceiverClient = acquireXceiverClient(containerKey); + try { + VolumeInfo volume = new VolumeInfo(); + volume.setVolumeName(args.getVolumeName()); + volume.setQuota(args.getQuota()); + volume.setOwner(new VolumeOwner(args.getUserName())); + volume.setCreatedOn(dateToString(new Date())); + volume.setCreatedBy(args.getAdminName()); + KeyData containerKeyData = fromVolumeToContainerKeyData( + xceiverClient.getPipeline().getContainerName(), containerKey, volume); + putKey(xceiverClient, containerKeyData, args); + } finally { + xceiverClientManager.releaseClient(xceiverClient); + } } @Override public void setVolumeOwner(VolumeArgs args) throws IOException, OzoneException { - + throw new UnsupportedOperationException("setVolumeOwner not implemented"); } @Override public void setVolumeQuota(VolumeArgs args, boolean remove) throws IOException, OzoneException { - + throw new UnsupportedOperationException("setVolumeQuota not implemented"); } @Override public boolean checkVolumeAccess(VolumeArgs args) throws IOException, OzoneException { - return false; + throw new UnsupportedOperationException("checkVolumeAccessnot implemented"); } @Override public ListVolumes listVolumes(UserArgs args) throws IOException, OzoneException { - return null; + throw new UnsupportedOperationException("listVolumes not implemented"); } @Override public void deleteVolume(VolumeArgs args) throws IOException, OzoneException { - + throw new UnsupportedOperationException("deleteVolume not implemented"); } @Override public VolumeInfo getVolumeInfo(VolumeArgs args) throws IOException, OzoneException { - return null; + String containerKey = buildContainerKey(args.getVolumeName()); + XceiverClient xceiverClient = acquireXceiverClient(containerKey); + try { + KeyData containerKeyData = containerKeyDataForRead( + xceiverClient.getPipeline().getContainerName(), containerKey); + GetKeyResponseProto response = getKey(xceiverClient, containerKeyData, + args); + return fromContainerKeyValueListToVolume( + response.getKeyData().getMetadataList()); + } finally { + xceiverClientManager.releaseClient(xceiverClient); + } } @Override - public void createBucket(BucketArgs args) + public void createBucket(final BucketArgs args) throws IOException, OzoneException { - + String containerKey = buildContainerKey(args.getVolumeName(), + args.getBucketName()); + XceiverClient xceiverClient = acquireXceiverClient(containerKey); + try { + BucketInfo bucket = new BucketInfo(); + bucket.setVolumeName(args.getVolumeName()); + bucket.setBucketName(args.getBucketName()); + bucket.setAcls(args.getAddAcls()); + bucket.setVersioning(args.getVersioning()); + bucket.setStorageType(args.getStorageType()); + KeyData containerKeyData = fromBucketToContainerKeyData( + xceiverClient.getPipeline().getContainerName(), containerKey, bucket); + putKey(xceiverClient, containerKeyData, args); + } finally { + xceiverClientManager.releaseClient(xceiverClient); + } } @Override public void setBucketAcls(BucketArgs args) throws IOException, OzoneException { - + throw new UnsupportedOperationException("setBucketAcls not implemented"); } @Override public void setBucketVersioning(BucketArgs args) throws IOException, OzoneException { - + throw new UnsupportedOperationException( + "setBucketVersioning not implemented"); } @Override public void setBucketStorageClass(BucketArgs args) throws IOException, OzoneException { - + throw new UnsupportedOperationException( + "setBucketStorageClass not implemented"); } @Override public void deleteBucket(BucketArgs args) throws IOException, OzoneException { - + throw new UnsupportedOperationException("deleteBucket not implemented"); } @Override public void checkBucketAccess(BucketArgs args) throws IOException, OzoneException { - + throw new UnsupportedOperationException( + "checkBucketAccess not implemented"); } @Override public ListBuckets listBuckets(VolumeArgs args) throws IOException, OzoneException { - return null; + throw new UnsupportedOperationException("listBuckets not implemented"); } @Override public BucketInfo getBucketInfo(BucketArgs args) throws IOException, OzoneException { - return null; + String containerKey = buildContainerKey(args.getVolumeName(), + args.getBucketName()); + XceiverClient xceiverClient = acquireXceiverClient(containerKey); + try { + KeyData containerKeyData = containerKeyDataForRead( + xceiverClient.getPipeline().getContainerName(), containerKey); + GetKeyResponseProto response = getKey(xceiverClient, containerKeyData, + args); + return fromContainerKeyValueListToBucket( + response.getKeyData().getMetadataList()); + } finally { + xceiverClientManager.releaseClient(xceiverClient); + } } - /** - * Writes a key in an existing bucket. - * - * @param args KeyArgs - * @return InputStream - * @throws OzoneException - */ @Override public OutputStream newKeyWriter(KeyArgs args) throws IOException, OzoneException { - return null; + String containerKey = buildContainerKey(args.getVolumeName(), + args.getBucketName(), args.getKeyName()); + KeyInfo key = new KeyInfo(); + key.setKeyName(args.getKeyName()); + key.setCreatedOn(dateToString(new Date())); + XceiverClient xceiverClient = acquireXceiverClient(containerKey); + return new ChunkOutputStream(containerKey, key, xceiverClientManager, + xceiverClient, args); } - /** - * Tells the file system that the object has been written out completely and - * it can do any house keeping operation that needs to be done. - * - * @param args Key Args - * @param stream - * @throws IOException - */ @Override public void commitKey(KeyArgs args, OutputStream stream) throws IOException, OzoneException { + stream.close(); + } + + @Override + public LengthInputStream newKeyReader(KeyArgs args) throws IOException, + OzoneException { + String containerKey = buildContainerKey(args.getVolumeName(), + args.getBucketName(), args.getKeyName()); + XceiverClient xceiverClient = acquireXceiverClient(containerKey); + boolean success = false; + try { + KeyData containerKeyData = containerKeyDataForRead( + xceiverClient.getPipeline().getContainerName(), containerKey); + GetKeyResponseProto response = getKey(xceiverClient, containerKeyData, + args); + long length = 0; + List<ChunkInfo> chunks = response.getKeyData().getChunksList(); + for (ChunkInfo chunk : chunks) { + length += chunk.getLen(); + } + success = true; + return new LengthInputStream(new ChunkInputStream( + containerKey, xceiverClientManager, xceiverClient, chunks, args), + length); + } finally { + if (!success) { + xceiverClientManager.releaseClient(xceiverClient); + } + } + } + + @Override + public void deleteKey(KeyArgs args) throws IOException, OzoneException { + throw new UnsupportedOperationException("deleteKey not implemented"); + } + @Override + public ListKeys listKeys(ListArgs args) throws IOException, OzoneException { + throw new UnsupportedOperationException("listKeys not implemented"); } /** - * Reads a key from an existing bucket. + * Acquires an {@link XceiverClient} connected to a {@link Pipeline} of nodes + * capable of serving container protocol operations. The container is + * selected based on the specified container key. * - * @param args KeyArgs - * @return LengthInputStream - * @throws IOException + * @param containerKey container key + * @return XceiverClient connected to a container + * @throws IOException if an XceiverClient cannot be acquired */ - @Override - public LengthInputStream newKeyReader(KeyArgs args) throws IOException, - OzoneException { - return null; + private XceiverClient acquireXceiverClient(String containerKey) + throws IOException { + Set<LocatedContainer> locatedContainers = + storageContainerLocation.getStorageContainerLocations( + new HashSet<>(Arrays.asList(containerKey))); + Pipeline pipeline = newPipelineFromLocatedContainer( + locatedContainers.iterator().next()); + return xceiverClientManager.acquireClient(pipeline); } /** - * Deletes an existing key. + * Creates a container key from any number of components by combining all + * components with a delimiter. * - * @param args KeyArgs - * @throws OzoneException + * @param parts container key components + * @return container key */ - @Override - public void deleteKey(KeyArgs args) throws IOException, OzoneException { + private static String buildContainerKey(String... parts) { + return '/' + StringUtils.join('/', parts); + } + /** + * Formats a date in the expected string format. + * + * @param date the date to format + * @return formatted string representation of date + */ + private static String dateToString(Date date) { + SimpleDateFormat sdf = + new SimpleDateFormat(OzoneConsts.OZONE_DATE_FORMAT, Locale.US); + sdf.setTimeZone(TimeZone.getTimeZone(OzoneConsts.OZONE_TIME_ZONE)); + return sdf.format(date); } /** - * Returns a list of Key. + * Translates a set of container locations, ordered such that the first is the + * leader, into a corresponding {@link Pipeline} object. * - * @param args KeyArgs - * @return BucketList - * @throws IOException + * @param locatedContainer container location + * @return pipeline corresponding to container locations */ - @Override - public ListKeys listKeys(ListArgs args) throws IOException, OzoneException { - return null; + private static Pipeline newPipelineFromLocatedContainer( + LocatedContainer locatedContainer) { + Set<DatanodeInfo> locations = locatedContainer.getLocations(); + String leaderId = locations.iterator().next().getDatanodeUuid(); + Pipeline pipeline = new Pipeline(leaderId); + for (DatanodeInfo location : locations) { + pipeline.addMember(location); + } + pipeline.setContainerName(locatedContainer.getContainerName()); + return pipeline; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/OzoneContainerTranslation.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/OzoneContainerTranslation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/OzoneContainerTranslation.java new file mode 100644 index 0000000..9333fe6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/OzoneContainerTranslation.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.web.storage; + +import java.util.List; + +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyValue; +import org.apache.hadoop.ozone.OzoneConsts.Versioning; +import org.apache.hadoop.ozone.web.request.OzoneQuota; +import org.apache.hadoop.ozone.web.response.BucketInfo; +import org.apache.hadoop.ozone.web.response.KeyInfo; +import org.apache.hadoop.ozone.web.response.VolumeInfo; +import org.apache.hadoop.ozone.web.response.VolumeOwner; +import org.apache.hadoop.util.StringUtils; + +/** + * This class contains methods that define the translation between the Ozone + * domain model and the storage container domain model. + */ +final class OzoneContainerTranslation { + + private static final String ACLS = "ACLS"; + private static final String BUCKET = "BUCKET"; + private static final String BUCKET_NAME = "BUCKET_NAME"; + private static final String CREATED_BY = "CREATED_BY"; + private static final String CREATED_ON = "CREATED_ON"; + private static final String KEY = "KEY"; + private static final String OWNER = "OWNER"; + private static final String QUOTA = "QUOTA"; + private static final String STORAGE_TYPE = "STORAGE_TYPE"; + private static final String TYPE = "TYPE"; + private static final String VERSIONING = "VERSIONING"; + private static final String VOLUME = "VOLUME"; + private static final String VOLUME_NAME = "VOLUME_NAME"; + + /** + * Creates key data intended for reading a container key. + * + * @param containerName container name + * @param containerKey container key + * @return KeyData intended for reading the container key + */ + public static KeyData containerKeyDataForRead(String containerName, + String containerKey) { + return KeyData + .newBuilder() + .setContainerName(containerName) + .setName(containerKey) + .build(); + } + + /** + * Translates a bucket to its container representation. + * + * @param containerName container name + * @param containerKey container key + * @param bucket the bucket to translate + * @return KeyData representation of bucket + */ + public static KeyData fromBucketToContainerKeyData( + String containerName, String containerKey, BucketInfo bucket) { + KeyData.Builder containerKeyData = KeyData + .newBuilder() + .setContainerName(containerName) + .setName(containerKey) + .addMetadata(newKeyValue(TYPE, BUCKET)) + .addMetadata(newKeyValue(VOLUME_NAME, bucket.getVolumeName())) + .addMetadata(newKeyValue(BUCKET_NAME, bucket.getBucketName())); + + if (bucket.getAcls() != null) { + containerKeyData.addMetadata(newKeyValue(ACLS, + StringUtils.join(',', bucket.getAcls()))); + } + + if (bucket.getVersioning() != null && + bucket.getVersioning() != Versioning.NOT_DEFINED) { + containerKeyData.addMetadata(newKeyValue(VERSIONING, + bucket.getVersioning().name())); + } + + if (bucket.getStorageType() != StorageType.RAM_DISK) { + containerKeyData.addMetadata(newKeyValue(STORAGE_TYPE, + bucket.getStorageType().name())); + } + + return containerKeyData.build(); + } + + /** + * Translates a bucket from its container representation. + * + * @param metadata container metadata representing the bucket + * @return bucket translated from container representation + */ + public static BucketInfo fromContainerKeyValueListToBucket( + List<KeyValue> metadata) { + BucketInfo bucket = new BucketInfo(); + for (KeyValue keyValue : metadata) { + switch (keyValue.getKey()) { + case VOLUME_NAME: + bucket.setVolumeName(keyValue.getValue()); + break; + case BUCKET_NAME: + bucket.setBucketName(keyValue.getValue()); + break; + case VERSIONING: + bucket.setVersioning( + Enum.valueOf(Versioning.class, keyValue.getValue())); + break; + case STORAGE_TYPE: + bucket.setStorageType( + Enum.valueOf(StorageType.class, keyValue.getValue())); + break; + default: + break; + } + } + return bucket; + } + + /** + * Translates a volume from its container representation. + * + * @param metadata container metadata representing the volume + * @return volume translated from container representation + */ + public static VolumeInfo fromContainerKeyValueListToVolume( + List<KeyValue> metadata) { + VolumeInfo volume = new VolumeInfo(); + for (KeyValue keyValue : metadata) { + switch (keyValue.getKey()) { + case VOLUME_NAME: + volume.setVolumeName(keyValue.getValue()); + break; + case CREATED_BY: + volume.setCreatedBy(keyValue.getValue()); + break; + case CREATED_ON: + volume.setCreatedOn(keyValue.getValue()); + break; + case OWNER: + volume.setOwner(new VolumeOwner(keyValue.getValue())); + break; + case QUOTA: + volume.setQuota(OzoneQuota.parseQuota(keyValue.getValue())); + break; + default: + break; + } + } + return volume; + } + + /** + * Translates a key to its container representation. + * + * @param containerName container name + * @param containerKey container key + * @param keyInfo key information received from call + * @return KeyData intended for reading the container key + */ + public static KeyData fromKeyToContainerKeyData(String containerName, + String containerKey, KeyInfo key) { + return KeyData + .newBuilder() + .setContainerName(containerName) + .setName(containerKey) + .addMetadata(newKeyValue(TYPE, KEY)) + .build(); + } + + /** + * Translates a key to its container representation. The return value is a + * builder that can be manipulated further before building the result. + * + * @param containerName container name + * @param containerKey container key + * @param keyInfo key information received from call + * @return KeyData builder + */ + public static KeyData.Builder fromKeyToContainerKeyDataBuilder( + String containerName, String containerKey, KeyInfo key) { + return KeyData + .newBuilder() + .setContainerName(containerName) + .setName(containerKey) + .addMetadata(newKeyValue(TYPE, KEY)); + } + + /** + * Translates a volume to its container representation. + * + * @param containerName container name + * @param containerKey container key + * @param volume the volume to translate + * @return KeyData representation of volume + */ + public static KeyData fromVolumeToContainerKeyData( + String containerName, String containerKey, VolumeInfo volume) { + KeyData.Builder containerKeyData = KeyData + .newBuilder() + .setContainerName(containerName) + .setName(containerKey) + .addMetadata(newKeyValue(TYPE, VOLUME)) + .addMetadata(newKeyValue(VOLUME_NAME, volume.getVolumeName())) + .addMetadata(newKeyValue(CREATED_ON, volume.getCreatedOn())); + + if (volume.getQuota() != null && volume.getQuota().sizeInBytes() != -1L) { + containerKeyData.addMetadata(newKeyValue(QUOTA, + OzoneQuota.formatQuota(volume.getQuota()))); + } + + if (volume.getOwner() != null && volume.getOwner().getName() != null && + !volume.getOwner().getName().isEmpty()) { + containerKeyData.addMetadata(newKeyValue(OWNER, + volume.getOwner().getName())); + } + + if (volume.getCreatedBy() != null && !volume.getCreatedBy().isEmpty()) { + containerKeyData.addMetadata( + newKeyValue(CREATED_BY, volume.getCreatedBy())); + } + + return containerKeyData.build(); + } + + /** + * Translates a key-value pair to its container representation. + * + * @param key the key + * @param value the value + * @return container representation of key-value pair + */ + private static KeyValue newKeyValue(String key, Object value) { + return KeyValue.newBuilder().setKey(key).setValue(value.toString()).build(); + } + + /** + * There is no need to instantiate this class. + */ + private OzoneContainerTranslation() { + } +}