http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java index 35906c6..0af3c4f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java @@ -189,10 +189,8 @@ public interface HdfsServerConstants { return NamenodeRole.NAMENODE; } } - + public void setClusterId(String cid) { - Preconditions.checkState(this == UPGRADE || this == UPGRADEONLY - || this == FORMAT); clusterId = cid; } @@ -217,7 +215,6 @@ public interface HdfsServerConstants { } public void setForce(int force) { - Preconditions.checkState(this == RECOVER); this.force = force; } @@ -230,7 +227,6 @@ public interface HdfsServerConstants { } public void setForceFormat(boolean force) { - Preconditions.checkState(this == FORMAT); isForceFormat = force; } @@ -239,7 +235,6 @@ public interface HdfsServerConstants { } public void setInteractiveFormat(boolean interactive) { - Preconditions.checkState(this == FORMAT); isInteractiveFormat = interactive; }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 1f51935..b757fd7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -48,6 +48,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_P import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY; import static org.apache.hadoop.util.ExitUtil.terminate; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ReconfigurationProtocolService; @@ -109,10 +110,8 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker; import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; @@ -191,7 +190,6 @@ import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.unix.DomainSocket; -import org.apache.hadoop.conf.OzoneConfiguration; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.SaslPropertiesResolver; import org.apache.hadoop.security.SecurityUtil; @@ -373,7 +371,6 @@ public class DataNode extends ReconfigurableBase private final String confVersion; private final long maxNumberOfBlocksToLog; private final boolean pipelineSupportECN; - private final boolean ozoneEnabled; private final List<String> usersWithLocalPathAccess; private final boolean connectToDnViaHostname; @@ -402,8 +399,6 @@ public class DataNode extends ReconfigurableBase private final SocketFactory socketFactory; - private DatanodeStateMachine datanodeStateMachine; - private static Tracer createTracer(Configuration conf) { return new Tracer.Builder("DataNode"). conf(TraceUtils.wrapHadoopConf(DATANODE_HTRACE_PREFIX , conf)). @@ -414,8 +409,6 @@ public class DataNode extends ReconfigurableBase private ScheduledThreadPoolExecutor metricsLoggerTimer; - private ObjectStoreHandler objectStoreHandler = null; - /** * Creates a dummy DataNode for testing purpose. */ @@ -434,7 +427,6 @@ public class DataNode extends ReconfigurableBase this.connectToDnViaHostname = false; this.blockScanner = new BlockScanner(this, this.getConf()); this.pipelineSupportECN = false; - this.ozoneEnabled = false; this.socketFactory = NetUtils.getDefaultSocketFactory(conf); this.dnConf = new DNConf(this); initOOBTimeout(); @@ -474,8 +466,6 @@ public class DataNode extends ReconfigurableBase DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED, DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED_DEFAULT); - this.ozoneEnabled = DFSUtil.isOzoneEnabled(conf); - confVersion = "core-" + conf.get("hadoop.common.configuration.version", "UNSPECIFIED") + ",hdfs-" + @@ -531,7 +521,7 @@ public class DataNode extends ReconfigurableBase @Override // ReconfigurableBase protected Configuration getNewConf() { - return new OzoneConfiguration(); + return new HdfsConfiguration(); } /** @@ -961,8 +951,8 @@ public class DataNode extends ReconfigurableBase // the DN is started by JSVC, pass it along. ServerSocketChannel httpServerChannel = secureResources != null ? secureResources.getHttpServerChannel() : null; - this.httpServer = new DatanodeHttpServer(getConf(), this, httpServerChannel, - this.objectStoreHandler); + + httpServer = new DatanodeHttpServer(getConf(), this, httpServerChannel); httpServer.start(); if (httpServer.getHttpAddress() != null) { infoPort = httpServer.getHttpAddress().getPort(); @@ -1411,9 +1401,7 @@ public class DataNode extends ReconfigurableBase // global DN settings registerMXBean(); - initDataXceiver(); - initObjectStoreHandler(); startInfoServer(); pauseMonitor = new JvmPauseMonitor(); pauseMonitor.init(getConf()); @@ -1454,19 +1442,6 @@ public class DataNode extends ReconfigurableBase } /** - * Initializes the object store handler. This must be called before - * initialization of the HTTP server. - * - * @throws IOException if there is an I/O error - */ - private void initObjectStoreHandler() throws IOException { - if (this.ozoneEnabled) { - this.objectStoreHandler = new ObjectStoreHandler(getConf()); - LOG.info("ozone is enabled."); - } - } - - /** * Checks if the DataNode has a secure configuration if security is enabled. * There are 2 possible configurations that are considered secure: * 1. The server has bound to privileged ports for RPC and HTTP via @@ -1578,6 +1553,11 @@ public class DataNode extends ReconfigurableBase streamingAddr.getAddress().getHostAddress(), hostName, storage.getDatanodeUuid(), getXferPort(), getInfoPort(), infoSecurePort, getIpcPort()); + for (ServicePlugin plugin : plugins) { + if (plugin instanceof DataNodeServicePlugin) { + ((DataNodeServicePlugin) plugin).onDatanodeIdCreation(dnId); + } + } return new DatanodeRegistration(dnId, storageInfo, new ExportedBlockKeys(), VersionInfo.getVersion()); } @@ -1598,31 +1578,15 @@ public class DataNode extends ReconfigurableBase + ". Expecting " + storage.getDatanodeUuid()); } - if (isOzoneEnabled()) { - if (datanodeStateMachine == null) { - datanodeStateMachine = new DatanodeStateMachine( - getDatanodeId(), - getConf()); - datanodeStateMachine.startDaemon(); + for (ServicePlugin plugin : plugins) { + if (plugin instanceof DataNodeServicePlugin) { + ((DataNodeServicePlugin) plugin) + .onDatanodeSuccessfulNamenodeRegisration(bpRegistration); } } registerBlockPoolWithSecretManager(bpRegistration, blockPoolId); } - - @VisibleForTesting - public OzoneContainer getOzoneContainerManager() { - return this.datanodeStateMachine.getContainer(); - } - - @VisibleForTesting - public DatanodeStateMachine.DatanodeStates getOzoneStateMachineState() { - if (this.datanodeStateMachine != null) { - return this.datanodeStateMachine.getContext().getState(); - } - // if the state machine doesn't exist then DN initialization is in progress - return DatanodeStateMachine.DatanodeStates.INIT; - } - + /** * After the block pool has contacted the NN, registers that block pool * with the secret manager, updating it with the secrets provided by the NN. @@ -1729,11 +1693,11 @@ public class DataNode extends ReconfigurableBase BPOfferService getBPOfferService(String bpid){ return blockPoolManager.get(bpid); } - + int getBpOsCount() { return blockPoolManager.getAllNamenodeThreads().size(); } - + /** * Initializes the {@link #data}. The initialization is done only once, when * handshake with the the first namenode is completed. @@ -2021,7 +1985,7 @@ public class DataNode extends ReconfigurableBase } } } - + List<BPOfferService> bposArray = (this.blockPoolManager == null) ? new ArrayList<BPOfferService>() : this.blockPoolManager.getAllNamenodeThreads(); @@ -2054,6 +2018,7 @@ public class DataNode extends ReconfigurableBase // Terminate directory scanner and block scanner shutdownPeriodicScanners(); + shutdownDiskBalancer(); // Stop the web server if (httpServer != null) { @@ -2064,17 +2029,6 @@ public class DataNode extends ReconfigurableBase } } - // Stop the object store handler - if (isOzoneEnabled()) { - if (this.objectStoreHandler != null) { - this.objectStoreHandler.close(); - if(datanodeStateMachine != null && - !datanodeStateMachine.isDaemonStopped()) { - datanodeStateMachine.stopDaemon(); - } - } - } - volumeChecker.shutdownAndWait(1, TimeUnit.SECONDS); if (storageLocationChecker != null) { @@ -2088,7 +2042,7 @@ public class DataNode extends ReconfigurableBase // shouldRun is set to false here to prevent certain threads from exiting // before the restart prep is done. this.shouldRun = false; - + // wait reconfiguration thread, if any, to exit shutdownReconfigurationTask(); @@ -2098,8 +2052,9 @@ public class DataNode extends ReconfigurableBase while (true) { // When shutting down for restart, wait 2.5 seconds before forcing // termination of receiver threads. - if (!this.shutdownForUpgrade || (this.shutdownForUpgrade && ( - Time.monotonicNow() - timeNotified > 1000))) { + if (!this.shutdownForUpgrade || + (this.shutdownForUpgrade && (Time.monotonicNow() - timeNotified + > 1000))) { this.threadGroup.interrupt(); break; } @@ -2110,8 +2065,7 @@ public class DataNode extends ReconfigurableBase } try { Thread.sleep(sleepMs); - } catch (InterruptedException e) { - } + } catch (InterruptedException e) {} sleepMs = sleepMs * 3 / 2; // exponential backoff if (sleepMs > 200) { sleepMs = 200; @@ -2137,9 +2091,9 @@ public class DataNode extends ReconfigurableBase metrics.setDataNodeActiveXceiversCount(0); } - // IPC server needs to be shutdown late in the process, otherwise - // shutdown command response won't get sent. - if (ipcServer != null) { + // IPC server needs to be shutdown late in the process, otherwise + // shutdown command response won't get sent. + if (ipcServer != null) { ipcServer.stop(); } @@ -2154,7 +2108,7 @@ public class DataNode extends ReconfigurableBase LOG.warn("Received exception in BlockPoolManager#shutDownAll", ie); } } - + if (storage != null) { try { this.storage.unlockAll(); @@ -2175,11 +2129,9 @@ public class DataNode extends ReconfigurableBase MBeans.unregister(dataNodeInfoBeanName); dataNodeInfoBeanName = null; } - if (shortCircuitRegistry != null) { - shortCircuitRegistry.shutdown(); - } + if (shortCircuitRegistry != null) shortCircuitRegistry.shutdown(); LOG.info("Shutdown complete."); - synchronized (this) { + synchronized(this) { // it is already false, but setting it again to avoid a findbug warning. this.shouldRun = false; // Notify the main thread. @@ -2717,9 +2669,8 @@ public class DataNode extends ReconfigurableBase */ public static DataNode instantiateDataNode(String args [], Configuration conf, SecureResources resources) throws IOException { - if (conf == null) { - conf = new OzoneConfiguration(); - } + if (conf == null) + conf = new HdfsConfiguration(); if (args != null) { // parse generic hadoop options @@ -3252,12 +3203,6 @@ public class DataNode extends ReconfigurableBase } catch (InterruptedException ie) { } } shutdown(); - - if (isOzoneEnabled()) { - if(datanodeStateMachine != null) { - datanodeStateMachine.stopDaemon(); - } - } } }; @@ -3552,13 +3497,10 @@ public class DataNode extends ReconfigurableBase return metricsLoggerTimer; } - public boolean isOzoneEnabled() { - return ozoneEnabled; - } - public Tracer getTracer() { return tracer; } + /** * Allows submission of a disk balancer Job. * @param planID - Hash value of the plan. @@ -3676,4 +3618,14 @@ public class DataNode extends ReconfigurableBase } return volumeInfoList; } + + @Private + public SecureResources getSecureResources() { + return secureResources; + } + + @Private + public Collection<ServicePlugin> getPlugins() { + return Collections.unmodifiableList(plugins); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeServicePlugin.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeServicePlugin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeServicePlugin.java new file mode 100644 index 0000000..08aae8b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeServicePlugin.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.util.ServicePlugin; + +/** + * Datanode specific service plugin with additional hooks. + */ +public interface DataNodeServicePlugin extends ServicePlugin{ + + /** + * Extension point to modify the datanode id. + * + * @param dataNodeId + */ + default void onDatanodeIdCreation(DatanodeID dataNodeId) { + //NOOP + } + + /** + * Extension point to modify the datanode id. + * + * @param dataNodeId + */ + default void onDatanodeSuccessfulNamenodeRegisration( + DatanodeRegistration dataNodeId) { + //NOOP + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java deleted file mode 100644 index a302462..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java +++ /dev/null @@ -1,189 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.datanode; - -import static org.apache.hadoop.ozone.OzoneConfigKeys.*; -import static com.sun.jersey.api.core.ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS; -import static com.sun.jersey.api.core.ResourceConfig.FEATURE_TRACE; - -import java.io.Closeable; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.Map; - -import com.sun.jersey.api.container.ContainerFactory; -import com.sun.jersey.api.core.ApplicationAdapter; - -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ozone.ksm.protocolPB - .KeySpaceManagerProtocolClientSideTranslatorPB; -import org.apache.hadoop.ozone.ksm.protocolPB.KeySpaceManagerProtocolPB; -import org.apache.hadoop.ozone.client.OzoneClientUtils; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.scm.protocolPB - .ScmBlockLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.Client; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.conf.OzoneConfiguration; -import org.apache.hadoop.scm.protocolPB - .StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB; -import org.apache.hadoop.ozone.web.handlers.ServiceFilter; -import org.apache.hadoop.ozone.web.interfaces.StorageHandler; -import org.apache.hadoop.ozone.web.ObjectStoreApplication; -import org.apache.hadoop.ozone.web.netty.ObjectStoreJerseyContainer; -import org.apache.hadoop.ozone.web.storage.DistributedStorageHandler; -import org.apache.hadoop.ozone.web.localstorage.LocalStorageHandler; -import org.apache.hadoop.security.UserGroupInformation; - -/** - * Implements object store handling within the DataNode process. This class is - * responsible for initializing and maintaining the RPC clients and servers and - * the web application required for the object store implementation. - */ -public final class ObjectStoreHandler implements Closeable { - - private static final Logger LOG = - LoggerFactory.getLogger(ObjectStoreJerseyContainer.class); - - private final ObjectStoreJerseyContainer objectStoreJerseyContainer; - private final KeySpaceManagerProtocolClientSideTranslatorPB - keySpaceManagerClient; - private final StorageContainerLocationProtocolClientSideTranslatorPB - storageContainerLocationClient; - private final ScmBlockLocationProtocolClientSideTranslatorPB - scmBlockLocationClient; - private final StorageHandler storageHandler; - - /** - * Creates a new ObjectStoreHandler. - * - * @param conf configuration - * @throws IOException if there is an I/O error - */ - public ObjectStoreHandler(Configuration conf) throws IOException { - String shType = conf.getTrimmed(OZONE_HANDLER_TYPE_KEY, - OZONE_HANDLER_TYPE_DEFAULT); - LOG.info("ObjectStoreHandler initializing with {}: {}", - OZONE_HANDLER_TYPE_KEY, shType); - boolean ozoneTrace = conf.getBoolean(OZONE_TRACE_ENABLED_KEY, - OZONE_TRACE_ENABLED_DEFAULT); - - // Initialize Jersey container for object store web application. - if (OzoneConsts.OZONE_HANDLER_DISTRIBUTED.equalsIgnoreCase(shType)) { - RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class, - ProtobufRpcEngine.class); - long scmVersion = - RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class); - - InetSocketAddress scmAddress = - OzoneClientUtils.getScmAddressForClients(conf); - this.storageContainerLocationClient = - new StorageContainerLocationProtocolClientSideTranslatorPB( - RPC.getProxy(StorageContainerLocationProtocolPB.class, scmVersion, - scmAddress, UserGroupInformation.getCurrentUser(), conf, - NetUtils.getDefaultSocketFactory(conf), - Client.getRpcTimeout(conf))); - - InetSocketAddress scmBlockAddress = - OzoneClientUtils.getScmAddressForBlockClients(conf); - this.scmBlockLocationClient = - new ScmBlockLocationProtocolClientSideTranslatorPB( - RPC.getProxy(ScmBlockLocationProtocolPB.class, scmVersion, - scmBlockAddress, UserGroupInformation.getCurrentUser(), conf, - NetUtils.getDefaultSocketFactory(conf), - Client.getRpcTimeout(conf))); - - RPC.setProtocolEngine(conf, KeySpaceManagerProtocolPB.class, - ProtobufRpcEngine.class); - long ksmVersion = - RPC.getProtocolVersion(KeySpaceManagerProtocolPB.class); - InetSocketAddress ksmAddress = OzoneClientUtils.getKsmAddress(conf); - this.keySpaceManagerClient = - new KeySpaceManagerProtocolClientSideTranslatorPB( - RPC.getProxy(KeySpaceManagerProtocolPB.class, ksmVersion, - ksmAddress, UserGroupInformation.getCurrentUser(), conf, - NetUtils.getDefaultSocketFactory(conf), - Client.getRpcTimeout(conf))); - - storageHandler = new DistributedStorageHandler( - new OzoneConfiguration(conf), - this.storageContainerLocationClient, - this.keySpaceManagerClient); - } else { - if (OzoneConsts.OZONE_HANDLER_LOCAL.equalsIgnoreCase(shType)) { - storageHandler = new LocalStorageHandler(conf); - this.storageContainerLocationClient = null; - this.scmBlockLocationClient = null; - this.keySpaceManagerClient = null; - } else { - throw new IllegalArgumentException( - String.format("Unrecognized value for %s: %s," - + " Allowed values are %s,%s", - OZONE_HANDLER_TYPE_KEY, shType, - OzoneConsts.OZONE_HANDLER_DISTRIBUTED, - OzoneConsts.OZONE_HANDLER_LOCAL)); - } - } - ApplicationAdapter aa = - new ApplicationAdapter(new ObjectStoreApplication()); - Map<String, Object> settingsMap = new HashMap<>(); - settingsMap.put(PROPERTY_CONTAINER_REQUEST_FILTERS, - ServiceFilter.class.getCanonicalName()); - settingsMap.put(FEATURE_TRACE, ozoneTrace); - aa.setPropertiesAndFeatures(settingsMap); - this.objectStoreJerseyContainer = ContainerFactory.createContainer( - ObjectStoreJerseyContainer.class, aa); - this.objectStoreJerseyContainer.setStorageHandler(storageHandler); - } - - /** - * Returns the initialized web application container. - * - * @return initialized web application container - */ - public ObjectStoreJerseyContainer getObjectStoreJerseyContainer() { - return this.objectStoreJerseyContainer; - } - - /** - * Returns the storage handler. - * - * @return returns the storage handler - */ - public StorageHandler getStorageHandler() { - return this.storageHandler; - } - - @Override - public void close() { - LOG.info("Closing ObjectStoreHandler."); - storageHandler.close(); - IOUtils.cleanupWithLogger(LOG, storageContainerLocationClient); - IOUtils.cleanupWithLogger(LOG, scmBlockLocationClient); - IOUtils.cleanupWithLogger(LOG, keySpaceManagerClient); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 1852192..c141293 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1745,7 +1745,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } assert newReplicaInfo.getState() == ReplicaState.FINALIZED : "Replica should be finalized"; - + volumeMap.add(bpid, newReplicaInfo); return newReplicaInfo; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java index dbc03c2..2e46b28 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java @@ -51,11 +51,9 @@ import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.datanode.BlockScanner; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.web.webhdfs.DataNodeUGIProvider; -import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.ozone.web.netty.ObjectStoreJerseyContainer; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.http.RestCsrfPreventionFilter; import org.apache.hadoop.security.ssl.SSLFactory; @@ -93,19 +91,11 @@ public class DatanodeHttpServer implements Closeable { public DatanodeHttpServer(final Configuration conf, final DataNode datanode, - final ServerSocketChannel externalHttpChannel, - final ObjectStoreHandler objectStoreHandler) + final ServerSocketChannel externalHttpChannel) throws IOException { this.restCsrfPreventionFilter = createRestCsrfPreventionFilter(conf); this.conf = conf; - final ObjectStoreJerseyContainer finalContainer; - if (objectStoreHandler != null) { - finalContainer = objectStoreHandler.getObjectStoreJerseyContainer(); - } else { - finalContainer = null; - } - Configuration confForInfoServer = new Configuration(conf); confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS_KEY, 10); int proxyPort = @@ -160,8 +150,7 @@ public class DatanodeHttpServer implements Closeable { } p.addLast( new ChunkedWriteHandler(), - new URLDispatcher(jettyAddr, conf, confForCreate, - finalContainer)); + new URLDispatcher(jettyAddr, conf, confForCreate)); } }); @@ -218,8 +207,7 @@ public class DatanodeHttpServer implements Closeable { } p.addLast( new ChunkedWriteHandler(), - new URLDispatcher(jettyAddr, conf, confForCreate, - finalContainer)); + new URLDispatcher(jettyAddr, conf, confForCreate)); } }); } else { @@ -386,4 +374,3 @@ public class DatanodeHttpServer implements Closeable { } } } - http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/RestCsrfPreventionFilterHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/RestCsrfPreventionFilterHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/RestCsrfPreventionFilterHandler.java index 4958bb5..e275fb3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/RestCsrfPreventionFilterHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/RestCsrfPreventionFilterHandler.java @@ -43,7 +43,7 @@ import org.apache.hadoop.security.http.RestCsrfPreventionFilter.HttpInteraction; * handler drops the request and immediately sends an HTTP 400 response. */ @InterfaceAudience.Private -final class RestCsrfPreventionFilterHandler +public final class RestCsrfPreventionFilterHandler extends SimpleChannelInboundHandler<HttpRequest> { private static final Log LOG = DatanodeHttpServer.LOG; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java index dd958d1..8ec5bf6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java @@ -17,106 +17,42 @@ */ package org.apache.hadoop.hdfs.server.datanode.web; -import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX; - import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.HttpRequest; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler; -import org.apache.hadoop.ozone.client.rest.headers.Header; -import org.apache.hadoop.ozone.web.netty.ObjectStoreJerseyContainer; -import org.apache.hadoop.ozone.web.netty.RequestDispatchObjectStoreChannelHandler; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.IOException; import java.net.InetSocketAddress; +import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX; + class URLDispatcher extends SimpleChannelInboundHandler<HttpRequest> { - protected static final Logger LOG = - LoggerFactory.getLogger(URLDispatcher.class); private final InetSocketAddress proxyHost; private final Configuration conf; private final Configuration confForCreate; - private final ObjectStoreJerseyContainer objectStoreJerseyContainer; URLDispatcher(InetSocketAddress proxyHost, Configuration conf, - Configuration confForCreate, - ObjectStoreJerseyContainer objectStoreJerseyContainer) - throws IOException { + Configuration confForCreate) { this.proxyHost = proxyHost; this.conf = conf; this.confForCreate = confForCreate; - this.objectStoreJerseyContainer = objectStoreJerseyContainer; } @Override protected void channelRead0(ChannelHandlerContext ctx, HttpRequest req) throws Exception { + String uri = req.getUri(); ChannelPipeline p = ctx.pipeline(); - if (isWebHdfsRequest(req)) { + if (uri.startsWith(WEBHDFS_PREFIX)) { WebHdfsHandler h = new WebHdfsHandler(conf, confForCreate); p.replace(this, WebHdfsHandler.class.getSimpleName(), h); h.channelRead0(ctx, req); - } else if (isObjectStoreRequest(req)) { - RequestDispatchObjectStoreChannelHandler h = - new RequestDispatchObjectStoreChannelHandler( - this.objectStoreJerseyContainer); - p.replace(this, - RequestDispatchObjectStoreChannelHandler.class.getSimpleName(), h); - h.channelRead0(ctx, req); - } else if (!isObjectStoreRequestHeaders(req)){ + } else { SimpleHttpProxyHandler h = new SimpleHttpProxyHandler(proxyHost); p.replace(this, SimpleHttpProxyHandler.class.getSimpleName(), h); h.channelRead0(ctx, req); } } - - - /* - * Returns true if the request has ozone headers - * - * @param req HTTP request - * @return true if request has ozone header, else false - */ - - private boolean isObjectStoreRequestHeaders(HttpRequest req) { - for (String version : req.headers().getAll(Header.OZONE_VERSION_HEADER)) { - if (version != null) { - LOG.debug("ozone : dispatching call to Ozone, when security is not " + - "enabled"); - return true; - } - } - return false; - } - - - /* - * Returns true if the request is to be handled by the object store. - * - * @param req HTTP request - * @return true if the request is to be handled by the object store - */ - private boolean isObjectStoreRequest(HttpRequest req) { - if (this.objectStoreJerseyContainer == null) { - LOG.debug("ozone : ozone is disabled or when security is enabled, ozone" + - " is not supported"); - return false; - } - return isObjectStoreRequestHeaders(req); - } - - /** - * Returns true if the request is to be handled by WebHDFS. - * - * @param req HTTP request - * @return true if the request is to be handled by WebHDFS - */ - private boolean isWebHdfsRequest(HttpRequest req) { - return req.getUri().startsWith(WEBHDFS_PREFIX); - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java index 2debf2e..10650da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java @@ -34,9 +34,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** - * NamespaceInfo is returned by the name-node in reply + * NamespaceInfo is returned by the name-node in reply * to a data-node handshake. - * + * */ @InterfaceAudience.Private @InterfaceStability.Evolving @@ -110,7 +110,7 @@ public class NamespaceInfo extends StorageInfo { this.capabilities = capabilities; } - public NamespaceInfo(int nsID, String clusterID, String bpID, + public NamespaceInfo(int nsID, String clusterID, String bpID, long cT) { this(nsID, clusterID, bpID, cT, Storage.getBuildVersion(), VersionInfo.getVersion()); @@ -122,7 +122,7 @@ public class NamespaceInfo extends StorageInfo { VersionInfo.getVersion()); this.state = st; } - + public long getCapabilities() { return capabilities; } @@ -151,7 +151,7 @@ public class NamespaceInfo extends StorageInfo { public String getBlockPoolID() { return blockPoolID; } - + public String getSoftwareVersion() { return softwareVersion; } @@ -194,4 +194,4 @@ public class NamespaceInfo extends StorageInfo { "BPID=" + storage.getBlockPoolID() + "."); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java index a1ea3fb..aaa1038 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java @@ -26,7 +26,6 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Collection; import java.util.Set; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -36,8 +35,6 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress; -import org.apache.hadoop.ozone.client.OzoneClientUtils; -import org.apache.hadoop.conf.OzoneConfiguration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; @@ -81,10 +78,6 @@ public class GetConf extends Configured implements Tool { "gets the exclude file path that defines the datanodes " + "that need to decommissioned."), NNRPCADDRESSES("-nnRpcAddresses", "gets the namenode rpc addresses"), - KEYSPACEMANAGER("-keyspacemanagers", - "gets list of ozone key space manager nodes in the cluster"), - STORAGECONTAINERMANAGER("-storagecontainermanagers", - "gets list of ozone storage container manager nodes in the cluster"), CONFKEY("-confKey [key]", "gets a specific key from the configuration"); private static final Map<String, CommandHandler> map; @@ -104,10 +97,6 @@ public class GetConf extends Configured implements Tool { new CommandHandler(DFSConfigKeys.DFS_HOSTS_EXCLUDE)); map.put(StringUtils.toLowerCase(NNRPCADDRESSES.getName()), new NNRpcAddressesCommandHandler()); - map.put(StringUtils.toLowerCase(KEYSPACEMANAGER.getName()), - new KeySpaceManagersCommandHandler()); - map.put(StringUtils.toLowerCase(STORAGECONTAINERMANAGER.getName()), - new StorageContainerManagersCommandHandler()); map.put(StringUtils.toLowerCase(CONFKEY.getName()), new PrintConfKeyCommandHandler()); } @@ -235,36 +224,9 @@ public class GetConf extends Configured implements Tool { * Handler for {@link Command#SECONDARY} */ static class SecondaryNameNodesCommandHandler extends CommandHandler { - @Override public int doWorkInternal(GetConf tool, String[] args) - throws IOException { - tool.printMap(DFSUtil.getSecondaryNameNodeAddresses(tool.getConf())); - return 0; - } - } - - /** - * Handler for {@link Command#STORAGECONTAINERMANAGER}. - */ - static class StorageContainerManagersCommandHandler extends CommandHandler { @Override - public int doWorkInternal(GetConf tool, String[] args) throws IOException { - Collection<InetSocketAddress> addresses = - OzoneClientUtils.getSCMAddresses(tool.getConf()); - for (InetSocketAddress addr : addresses) { - tool.printOut(addr.getHostName()); - } - return 0; - } - } - - /** - * Handler for {@link Command#KEYSPACEMANAGER}. - */ - static class KeySpaceManagersCommandHandler extends CommandHandler { - @Override - public int doWorkInternal(GetConf tool, String[] args) throws IOException { - tool.printOut(OzoneClientUtils.getKsmAddress(tool.getConf()) - .getHostName()); + public int doWorkInternal(GetConf tool, String []args) throws IOException { + tool.printMap(DFSUtil.getSecondaryNameNodeAddresses(tool.getConf())); return 0; } } @@ -395,11 +357,8 @@ public class GetConf extends Configured implements Tool { if (DFSUtil.parseHelpArgument(args, USAGE, System.out, true)) { System.exit(0); } - - Configuration conf = new Configuration(); - conf.addResource(new HdfsConfiguration()); - conf.addResource(new OzoneConfiguration()); - int res = ToolRunner.run(new GetConf(conf), args); + + int res = ToolRunner.run(new GetConf(new HdfsConfiguration()), args); System.exit(res); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/InconsistentStorageStateException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/InconsistentStorageStateException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/InconsistentStorageStateException.java deleted file mode 100644 index c3f9234..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/InconsistentStorageStateException.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.common; - -import java.io.File; -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -/** - * The exception is thrown when file system state is inconsistent - * and is not recoverable. - */ [email protected] [email protected] -public class InconsistentStorageStateException extends IOException { - private static final long serialVersionUID = 1L; - - public InconsistentStorageStateException(String descr) { - super(descr); - } - - public InconsistentStorageStateException(File dir, String descr) { - super("Directory " + getFilePath(dir) + " is in an inconsistent state: " - + descr); - } - - private static String getFilePath(File dir) { - try { - return dir.getCanonicalPath(); - } catch (IOException e) { - } - return dir.getPath(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/Storage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/Storage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/Storage.java deleted file mode 100644 index cfcddb4..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/Storage.java +++ /dev/null @@ -1,249 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.common; - -import java.io.File; -import java.io.IOException; -import java.nio.file.DirectoryStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Properties; - - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeType; -import org.apache.hadoop.util.Time; - -/** - * Storage information file. This Class defines the methods to check - * the consistency of the storage dir and the version file. - * <p> - * Local storage information is stored in a separate file VERSION. - * It contains type of the node, - * the storage layout version, the SCM id, and - * the KSM/SCM state creation time. - * - */ [email protected] -public abstract class Storage { - private static final Logger LOG = LoggerFactory.getLogger(Storage.class); - - protected static final String STORAGE_DIR_CURRENT = "current"; - protected static final String STORAGE_FILE_VERSION = "VERSION"; - - private final NodeType nodeType; - private final File root; - private final File storageDir; - - private StorageState state; - private StorageInfo storageInfo; - - - /** - * Determines the state of the Version file. - */ - public enum StorageState { - NON_EXISTENT, NOT_INITIALIZED, INITIALIZED - } - - public Storage(NodeType type, File root, String sdName) - throws IOException { - this.nodeType = type; - this.root = root; - this.storageDir = new File(root, sdName); - this.state = getStorageState(); - if (state == StorageState.INITIALIZED) { - this.storageInfo = new StorageInfo(type, getVersionFile()); - } else { - this.storageInfo = new StorageInfo( - nodeType, StorageInfo.newClusterID(), Time.now()); - setNodeProperties(); - } - } - - /** - * Gets the path of the Storage dir. - * @return Stoarge dir path - */ - public String getStorageDir() { - return storageDir.getAbsoluteFile().toString(); - } - - /** - * Gets the state of the version file. - * @return the state of the Version file - */ - public StorageState getState() { - return state; - } - - public NodeType getNodeType() { - return storageInfo.getNodeType(); - } - - public String getClusterID() { - return storageInfo.getClusterID(); - } - - public long getCreationTime() { - return storageInfo.getCreationTime(); - } - - public void setClusterId(String clusterId) throws IOException { - if (state == StorageState.INITIALIZED) { - throw new IOException( - "Storage directory " + storageDir + " already initialized."); - } else { - storageInfo.setClusterId(clusterId); - } - } - - /** - * Retreives the storageInfo instance to read/write the common - * version file properties. - * @return the instance of the storageInfo class - */ - protected StorageInfo getStorageInfo() { - return storageInfo; - } - - abstract protected Properties getNodeProperties(); - - /** - * Sets the Node properties spaecific to KSM/SCM. - */ - private void setNodeProperties() { - Properties nodeProperties = getNodeProperties(); - if (nodeProperties != null) { - for (String key : nodeProperties.stringPropertyNames()) { - storageInfo.setProperty(key, nodeProperties.getProperty(key)); - } - } - } - - /** - * Directory {@code current} contains latest files defining - * the file system meta-data. - * - * @return the directory path - */ - private File getCurrentDir() { - return new File(storageDir, STORAGE_DIR_CURRENT); - } - - /** - * File {@code VERSION} contains the following fields: - * <ol> - * <li>node type</li> - * <li>KSM/SCM state creation time</li> - * <li>other fields specific for this node type</li> - * </ol> - * The version file is always written last during storage directory updates. - * The existence of the version file indicates that all other files have - * been successfully written in the storage directory, the storage is valid - * and does not need to be recovered. - * - * @return the version file path - */ - private File getVersionFile() { - return new File(getCurrentDir(), STORAGE_FILE_VERSION); - } - - - /** - * Check to see if current/ directory is empty. This method is used - * before determining to format the directory. - * @throws IOException if unable to list files under the directory. - */ - private void checkEmptyCurrent() throws IOException { - File currentDir = getCurrentDir(); - if (!currentDir.exists()) { - // if current/ does not exist, it's safe to format it. - return; - } - try (DirectoryStream<Path> dirStream = Files - .newDirectoryStream(currentDir.toPath())) { - if (dirStream.iterator().hasNext()) { - throw new InconsistentStorageStateException(getCurrentDir(), - "Can't initialize the storage directory because the current " - + "it is not empty."); - } - } - } - - /** - * Check consistency of the storage directory. - * - * @return state {@link StorageState} of the storage directory - * @throws IOException - */ - private StorageState getStorageState() throws IOException { - assert root != null : "root is null"; - String rootPath = root.getCanonicalPath(); - try { // check that storage exists - if (!root.exists()) { - // storage directory does not exist - LOG.warn("Storage directory " + rootPath + " does not exist"); - return StorageState.NON_EXISTENT; - } - // or is inaccessible - if (!root.isDirectory()) { - LOG.warn(rootPath + "is not a directory"); - return StorageState.NON_EXISTENT; - } - if (!FileUtil.canWrite(root)) { - LOG.warn("Cannot access storage directory " + rootPath); - return StorageState.NON_EXISTENT; - } - } catch (SecurityException ex) { - LOG.warn("Cannot access storage directory " + rootPath, ex); - return StorageState.NON_EXISTENT; - } - - // check whether current directory is valid - File versionFile = getVersionFile(); - boolean hasCurrent = versionFile.exists(); - - if (hasCurrent) { - return StorageState.INITIALIZED; - } else { - checkEmptyCurrent(); - return StorageState.NOT_INITIALIZED; - } - } - - /** - * Creates the Version file if not present, - * otherwise returns with IOException. - * @throws IOException - */ - public void initialize() throws IOException { - if (state == StorageState.INITIALIZED) { - throw new IOException("Storage directory already initialized."); - } - if (!getCurrentDir().mkdirs()) { - throw new IOException("Cannot create directory " + getCurrentDir()); - } - storageInfo.writeTo(getVersionFile()); - } - -} - http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/StorageInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/StorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/StorageInfo.java deleted file mode 100644 index 82ad955..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/StorageInfo.java +++ /dev/null @@ -1,184 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.common; - -import java.io.IOException; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.File; -import java.io.RandomAccessFile; - -import java.util.Properties; -import java.util.UUID; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeType; - -/** - * Common class for storage information. This class defines the common - * properties and functions to set them , write them into the version file - * and read them from the version file. - * - */ [email protected] -public class StorageInfo { - - private Properties properties = new Properties(); - - /** - * Property to hold node type. - */ - private static final String NODE_TYPE = "nodeType"; - /** - * Property to hold ID of the cluster. - */ - private static final String CLUSTER_ID = "clusterID"; - /** - * Property to hold creation time of the storage. - */ - private static final String CREATION_TIME = "cTime"; - - /** - * Constructs StorageInfo instance. - * @param type - * Type of the node using the storage - * @param cid - * Cluster ID - * @param cT - * Cluster creation Time - - * @throws IOException - */ - public StorageInfo(NodeType type, String cid, long cT) - throws IOException { - Preconditions.checkNotNull(type); - Preconditions.checkNotNull(cid); - Preconditions.checkNotNull(cT); - properties.setProperty(NODE_TYPE, type.name()); - properties.setProperty(CLUSTER_ID, cid); - properties.setProperty(CREATION_TIME, String.valueOf(cT)); - } - - public StorageInfo(NodeType type, File propertiesFile) - throws IOException { - this.properties = readFrom(propertiesFile); - verifyNodeType(type); - verifyClusterId(); - verifyCreationTime(); - } - - public NodeType getNodeType() { - return NodeType.valueOf(properties.getProperty(NODE_TYPE)); - } - - public String getClusterID() { - return properties.getProperty(CLUSTER_ID); - } - - public Long getCreationTime() { - String creationTime = properties.getProperty(CREATION_TIME); - if(creationTime != null) { - return Long.parseLong(creationTime); - } - return null; - } - - public String getProperty(String key) { - return properties.getProperty(key); - } - - public void setProperty(String key, String value) { - properties.setProperty(key, value); - } - - public void setClusterId(String clusterId) { - properties.setProperty(CLUSTER_ID, clusterId); - } - - private void verifyNodeType(NodeType type) - throws InconsistentStorageStateException { - NodeType nodeType = getNodeType(); - Preconditions.checkNotNull(nodeType); - if(type != nodeType) { - throw new InconsistentStorageStateException("Expected NodeType: " + type + - ", but found: " + nodeType); - } - } - - private void verifyClusterId() - throws InconsistentStorageStateException { - String clusterId = getClusterID(); - Preconditions.checkNotNull(clusterId); - if(clusterId.isEmpty()) { - throw new InconsistentStorageStateException("Cluster ID not found"); - } - } - - private void verifyCreationTime() { - Long creationTime = getCreationTime(); - Preconditions.checkNotNull(creationTime); - } - - - public void writeTo(File to) - throws IOException { - try (RandomAccessFile file = new RandomAccessFile(to, "rws"); - FileOutputStream out = new FileOutputStream(file.getFD())) { - file.seek(0); - /* - * If server is interrupted before this line, - * the version file will remain unchanged. - */ - properties.store(out, null); - /* - * Now the new fields are flushed to the head of the file, but file - * length can still be larger then required and therefore the file can - * contain whole or corrupted fields from its old contents in the end. - * If server is interrupted here and restarted later these extra fields - * either should not effect server behavior or should be handled - * by the server correctly. - */ - file.setLength(out.getChannel().position()); - } - } - - private Properties readFrom(File from) throws IOException { - try (RandomAccessFile file = new RandomAccessFile(from, "rws"); - FileInputStream in = new FileInputStream(file.getFD())) { - Properties props = new Properties(); - file.seek(0); - props.load(in); - return props; - } - } - - /** - * Generate new clusterID. - * - * clusterID is a persistent attribute of the cluster. - * It is generated when the cluster is created and remains the same - * during the life cycle of the cluster. When a new SCM node is initialized, - * if this is a new cluster, a new clusterID is generated and stored. - * @return new clusterID - */ - public static String newClusterID() { - return "CID-" + UUID.randomUUID().toString(); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/package-info.java deleted file mode 100644 index 6517e58..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.common; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java deleted file mode 100644 index 690e3b5..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.common.helpers; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; - -import java.io.IOException; -import java.util.Map; -import java.util.TreeMap; - -/** - * Java class that represents ChunkInfo ProtoBuf class. This helper class allows - * us to convert to and from protobuf to normal java. - */ -public class ChunkInfo { - private final String chunkName; - private final long offset; - private final long len; - private String checksum; - private final Map<String, String> metadata; - - - /** - * Constructs a ChunkInfo. - * - * @param chunkName - File Name where chunk lives. - * @param offset - offset where Chunk Starts. - * @param len - Length of the Chunk. - */ - public ChunkInfo(String chunkName, long offset, long len) { - this.chunkName = chunkName; - this.offset = offset; - this.len = len; - this.metadata = new TreeMap<>(); - } - - /** - * Adds metadata. - * - * @param key - Key Name. - * @param value - Value. - * @throws IOException - */ - public void addMetadata(String key, String value) throws IOException { - synchronized (this.metadata) { - if (this.metadata.containsKey(key)) { - throw new IOException("This key already exists. Key " + key); - } - metadata.put(key, value); - } - } - - /** - * Gets a Chunkinfo class from the protobuf definitions. - * - * @param info - Protobuf class - * @return ChunkInfo - * @throws IOException - */ - public static ChunkInfo getFromProtoBuf(ContainerProtos.ChunkInfo info) - throws IOException { - Preconditions.checkNotNull(info); - - ChunkInfo chunkInfo = new ChunkInfo(info.getChunkName(), info.getOffset(), - info.getLen()); - - for (int x = 0; x < info.getMetadataCount(); x++) { - chunkInfo.addMetadata(info.getMetadata(x).getKey(), - info.getMetadata(x).getValue()); - } - - - if (info.hasChecksum()) { - chunkInfo.setChecksum(info.getChecksum()); - } - return chunkInfo; - } - - /** - * Returns a ProtoBuf Message from ChunkInfo. - * - * @return Protocol Buffer Message - */ - public ContainerProtos.ChunkInfo getProtoBufMessage() { - ContainerProtos.ChunkInfo.Builder builder = ContainerProtos - .ChunkInfo.newBuilder(); - - builder.setChunkName(this.getChunkName()); - builder.setOffset(this.getOffset()); - builder.setLen(this.getLen()); - if (this.getChecksum() != null && !this.getChecksum().isEmpty()) { - builder.setChecksum(this.getChecksum()); - } - - for (Map.Entry<String, String> entry : metadata.entrySet()) { - OzoneProtos.KeyValue.Builder keyValBuilder = - OzoneProtos.KeyValue.newBuilder(); - builder.addMetadata(keyValBuilder.setKey(entry.getKey()) - .setValue(entry.getValue()).build()); - } - - return builder.build(); - } - - /** - * Returns the chunkName. - * - * @return - String - */ - public String getChunkName() { - return chunkName; - } - - /** - * Gets the start offset of the given chunk in physical file. - * - * @return - long - */ - public long getOffset() { - return offset; - } - - /** - * Returns the length of the Chunk. - * - * @return long - */ - public long getLen() { - return len; - } - - /** - * Returns the SHA256 value of this chunk. - * - * @return - Hash String - */ - public String getChecksum() { - return checksum; - } - - /** - * Sets the Hash value of this chunk. - * - * @param checksum - Hash String. - */ - public void setChecksum(String checksum) { - this.checksum = checksum; - } - - /** - * Returns Metadata associated with this Chunk. - * - * @return - Map of Key,values. - */ - public Map<String, String> getMetadata() { - return metadata; - } - - @Override - public String toString() { - return "ChunkInfo{" + - "chunkName='" + chunkName + - ", offset=" + offset + - ", len=" + len + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java deleted file mode 100644 index 4eb211b..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java +++ /dev/null @@ -1,345 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.common.helpers; - -import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; -import org.apache.commons.codec.binary.Hex; -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousFileChannel; -import java.nio.channels.FileLock; -import java.nio.file.StandardOpenOption; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.concurrent.ExecutionException; - -import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .Result.CHECKSUM_MISMATCH; -import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .Result.CONTAINER_INTERNAL_ERROR; -import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .Result.CONTAINER_NOT_FOUND; -import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .Result.INVALID_WRITE_SIZE; -import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .Result.IO_EXCEPTION; -import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .Result.OVERWRITE_FLAG_REQUIRED; -import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .Result.UNABLE_TO_FIND_CHUNK; -import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .Result.UNABLE_TO_FIND_DATA_DIR; - -/** - * Set of utility functions used by the chunk Manager. - */ -public final class ChunkUtils { - - /* Never constructed. */ - private ChunkUtils() { - } - - /** - * Checks if we are getting a request to overwrite an existing range of - * chunk. - * - * @param chunkFile - File - * @param chunkInfo - Buffer to write - * @return bool - */ - public static boolean isOverWriteRequested(File chunkFile, ChunkInfo - chunkInfo) { - - if (!chunkFile.exists()) { - return false; - } - - long offset = chunkInfo.getOffset(); - return offset < chunkFile.length(); - } - - /** - * Overwrite is permitted if an only if the user explicitly asks for it. We - * permit this iff the key/value pair contains a flag called - * [OverWriteRequested, true]. - * - * @param chunkInfo - Chunk info - * @return true if the user asks for it. - */ - public static boolean isOverWritePermitted(ChunkInfo chunkInfo) { - String overWrite = chunkInfo.getMetadata().get(OzoneConsts.CHUNK_OVERWRITE); - return (overWrite != null) && - (!overWrite.isEmpty()) && - (Boolean.valueOf(overWrite)); - } - - /** - * Validates chunk data and returns a file object to Chunk File that we are - * expected to write data to. - * - * @param pipeline - pipeline. - * @param data - container data. - * @param info - chunk info. - * @return File - * @throws StorageContainerException - */ - public static File validateChunk(Pipeline pipeline, ContainerData data, - ChunkInfo info) throws StorageContainerException { - - Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class); - - File chunkFile = getChunkFile(pipeline, data, info); - if (ChunkUtils.isOverWriteRequested(chunkFile, info)) { - if (!ChunkUtils.isOverWritePermitted(info)) { - log.error("Rejecting write chunk request. Chunk overwrite " + - "without explicit request. {}", info.toString()); - throw new StorageContainerException("Rejecting write chunk request. " + - "OverWrite flag required." + info.toString(), - OVERWRITE_FLAG_REQUIRED); - } - } - return chunkFile; - } - - /** - * Validates that Path to chunk file exists. - * - * @param pipeline - Container Info. - * @param data - Container Data - * @param info - Chunk info - * @return - File. - * @throws StorageContainerException - */ - public static File getChunkFile(Pipeline pipeline, ContainerData data, - ChunkInfo info) throws StorageContainerException { - - Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class); - if (data == null) { - log.error("Invalid container Name: {}", pipeline.getContainerName()); - throw new StorageContainerException("Unable to find the container Name:" + - " " + - pipeline.getContainerName(), CONTAINER_NOT_FOUND); - } - - File dataDir = ContainerUtils.getDataDirectory(data).toFile(); - if (!dataDir.exists()) { - log.error("Unable to find the data directory: {}", dataDir); - throw new StorageContainerException("Unable to find the data directory:" + - " " + dataDir, UNABLE_TO_FIND_DATA_DIR); - } - - return dataDir.toPath().resolve(info.getChunkName()).toFile(); - - } - - /** - * Writes the data in chunk Info to the specified location in the chunkfile. - * - * @param chunkFile - File to write data to. - * @param chunkInfo - Data stream to write. - * @param data - The data buffer. - * @throws StorageContainerException - */ - public static void writeData(File chunkFile, ChunkInfo chunkInfo, - byte[] data) throws - StorageContainerException, ExecutionException, InterruptedException, - NoSuchAlgorithmException { - - Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class); - if (data.length != chunkInfo.getLen()) { - String err = String.format("data array does not match the length " + - "specified. DataLen: %d Byte Array: %d", - chunkInfo.getLen(), data.length); - log.error(err); - throw new StorageContainerException(err, INVALID_WRITE_SIZE); - } - - AsynchronousFileChannel file = null; - FileLock lock = null; - - try { - file = - AsynchronousFileChannel.open(chunkFile.toPath(), - StandardOpenOption.CREATE, - StandardOpenOption.WRITE, - StandardOpenOption.SPARSE, - StandardOpenOption.SYNC); - lock = file.lock().get(); - if (chunkInfo.getChecksum() != null && - !chunkInfo.getChecksum().isEmpty()) { - verifyChecksum(chunkInfo, data, log); - } - int size = file.write(ByteBuffer.wrap(data), chunkInfo.getOffset()).get(); - if (size != data.length) { - log.error("Invalid write size found. Size:{} Expected: {} ", size, - data.length); - throw new StorageContainerException("Invalid write size found. " + - "Size: " + size + " Expected: " + data.length, INVALID_WRITE_SIZE); - } - } catch (IOException e) { - throw new StorageContainerException(e, IO_EXCEPTION); - - } finally { - if (lock != null) { - try { - lock.release(); - } catch (IOException e) { - log.error("Unable to release lock ??, Fatal Error."); - throw new StorageContainerException(e, CONTAINER_INTERNAL_ERROR); - - } - } - if (file != null) { - try { - file.close(); - } catch (IOException e) { - throw new StorageContainerException("Error closing chunk file", - e, CONTAINER_INTERNAL_ERROR); - } - } - } - } - - /** - * Verifies the checksum of a chunk against the data buffer. - * - * @param chunkInfo - Chunk Info. - * @param data - data buffer - * @param log - log - * @throws NoSuchAlgorithmException - * @throws StorageContainerException - */ - private static void verifyChecksum(ChunkInfo chunkInfo, byte[] data, Logger - log) throws NoSuchAlgorithmException, StorageContainerException { - MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); - sha.update(data); - if (!Hex.encodeHexString(sha.digest()).equals( - chunkInfo.getChecksum())) { - log.error("Checksum mismatch. Provided: {} , computed: {}", - chunkInfo.getChecksum(), DigestUtils.sha256Hex(sha.digest())); - throw new StorageContainerException("Checksum mismatch. Provided: " + - chunkInfo.getChecksum() + " , computed: " + - DigestUtils.sha256Hex(sha.digest()), CHECKSUM_MISMATCH); - } - } - - /** - * Reads data from an existing chunk file. - * - * @param chunkFile - file where data lives. - * @param data - chunk definition. - * @return ByteBuffer - * @throws StorageContainerException - * @throws ExecutionException - * @throws InterruptedException - */ - public static ByteBuffer readData(File chunkFile, ChunkInfo data) throws - StorageContainerException, ExecutionException, InterruptedException, - NoSuchAlgorithmException { - Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class); - - if (!chunkFile.exists()) { - log.error("Unable to find the chunk file. chunk info : {}", - data.toString()); - throw new StorageContainerException("Unable to find the chunk file. " + - "chunk info " + - data.toString(), UNABLE_TO_FIND_CHUNK); - } - - AsynchronousFileChannel file = null; - FileLock lock = null; - try { - file = - AsynchronousFileChannel.open(chunkFile.toPath(), - StandardOpenOption.READ); - lock = file.lock(data.getOffset(), data.getLen(), true).get(); - - ByteBuffer buf = ByteBuffer.allocate((int) data.getLen()); - file.read(buf, data.getOffset()).get(); - - if (data.getChecksum() != null && !data.getChecksum().isEmpty()) { - verifyChecksum(data, buf.array(), log); - } - - return buf; - } catch (IOException e) { - throw new StorageContainerException(e, IO_EXCEPTION); - } finally { - if (lock != null) { - try { - lock.release(); - } catch (IOException e) { - log.error("I/O error is lock release."); - } - } - if (file != null) { - IOUtils.closeStream(file); - } - } - } - - /** - * Returns a CreateContainer Response. This call is used by create and delete - * containers which have null success responses. - * - * @param msg Request - * @return Response. - */ - public static ContainerProtos.ContainerCommandResponseProto - getChunkResponse(ContainerProtos.ContainerCommandRequestProto msg) { - return ContainerUtils.getContainerResponse(msg); - } - - /** - * Gets a response to the read chunk calls. - * - * @param msg - Msg - * @param data - Data - * @param info - Info - * @return Response. - */ - public static ContainerProtos.ContainerCommandResponseProto - getReadChunkResponse(ContainerProtos.ContainerCommandRequestProto msg, - byte[] data, ChunkInfo info) { - Preconditions.checkNotNull(msg); - - ContainerProtos.ReadChunkResponseProto.Builder response = - ContainerProtos.ReadChunkResponseProto.newBuilder(); - response.setChunkData(info.getProtoBufMessage()); - response.setData(ByteString.copyFrom(data)); - response.setPipeline(msg.getReadChunk().getPipeline()); - - ContainerProtos.ContainerCommandResponseProto.Builder builder = - ContainerUtils.getContainerResponse(msg, ContainerProtos.Result - .SUCCESS, ""); - builder.setReadChunk(response); - return builder.build(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java deleted file mode 100644 index d0886e1..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java +++ /dev/null @@ -1,326 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.common.helpers; - -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; -import org.apache.hadoop.scm.ScmConfigKeys; -import org.apache.hadoop.util.Time; - -import java.io.IOException; -import java.util.Collections; -import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicLong; - -/** - * This class maintains the information about a container in the ozone world. - * <p> - * A container is a name, along with metadata- which is a set of key value - * pair. - */ -public class ContainerData { - - private final String containerName; - private final Map<String, String> metadata; - private String dbPath; // Path to Level DB Store. - // Path to Physical file system where container and checksum are stored. - private String containerFilePath; - private String hash; - private AtomicLong bytesUsed; - private long maxSize; - private Long containerID; - private OzoneProtos.LifeCycleState state; - - /** - * Constructs a ContainerData Object. - * - * @param containerName - Name - */ - public ContainerData(String containerName, Long containerID, - Configuration conf) { - this.metadata = new TreeMap<>(); - this.containerName = containerName; - this.maxSize = conf.getLong(ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, - ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT) * OzoneConsts.GB; - this.bytesUsed = new AtomicLong(0L); - this.containerID = containerID; - this.state = OzoneProtos.LifeCycleState.OPEN; - } - - /** - * Constructs a ContainerData object from ProtoBuf classes. - * - * @param protoData - ProtoBuf Message - * @throws IOException - */ - public static ContainerData getFromProtBuf( - ContainerProtos.ContainerData protoData, Configuration conf) - throws IOException { - ContainerData data = new ContainerData(protoData.getName(), - protoData.getContainerID(), conf); - for (int x = 0; x < protoData.getMetadataCount(); x++) { - data.addMetadata(protoData.getMetadata(x).getKey(), - protoData.getMetadata(x).getValue()); - } - - if (protoData.hasContainerPath()) { - data.setContainerPath(protoData.getContainerPath()); - } - - if (protoData.hasDbPath()) { - data.setDBPath(protoData.getDbPath()); - } - - if (protoData.hasState()) { - data.setState(protoData.getState()); - } - - if(protoData.hasHash()) { - data.setHash(protoData.getHash()); - } - - if (protoData.hasBytesUsed()) { - data.setBytesUsed(protoData.getBytesUsed()); - } - - if (protoData.hasSize()) { - data.setMaxSize(protoData.getSize()); - } - return data; - } - - /** - * Returns a ProtoBuf Message from ContainerData. - * - * @return Protocol Buffer Message - */ - public ContainerProtos.ContainerData getProtoBufMessage() { - ContainerProtos.ContainerData.Builder builder = ContainerProtos - .ContainerData.newBuilder(); - builder.setName(this.getContainerName()); - builder.setContainerID(this.getContainerID()); - - if (this.getDBPath() != null) { - builder.setDbPath(this.getDBPath()); - } - - if (this.getHash() != null) { - builder.setHash(this.getHash()); - } - - if (this.getContainerPath() != null) { - builder.setContainerPath(this.getContainerPath()); - } - - builder.setState(this.getState()); - - for (Map.Entry<String, String> entry : metadata.entrySet()) { - OzoneProtos.KeyValue.Builder keyValBuilder = - OzoneProtos.KeyValue.newBuilder(); - builder.addMetadata(keyValBuilder.setKey(entry.getKey()) - .setValue(entry.getValue()).build()); - } - - if (this.getBytesUsed() >= 0) { - builder.setBytesUsed(this.getBytesUsed()); - } - - if (this.getKeyCount() >= 0) { - builder.setKeyCount(this.getKeyCount()); - } - - if (this.getMaxSize() >= 0) { - builder.setSize(this.getMaxSize()); - } - - return builder.build(); - } - - /** - * Returns the name of the container. - * - * @return - name - */ - public String getContainerName() { - return containerName; - } - - /** - * Adds metadata. - */ - public void addMetadata(String key, String value) throws IOException { - synchronized (this.metadata) { - if (this.metadata.containsKey(key)) { - throw new IOException("This key already exists. Key " + key); - } - metadata.put(key, value); - } - } - - /** - * Returns all metadata. - */ - public Map<String, String> getAllMetadata() { - synchronized (this.metadata) { - return Collections.unmodifiableMap(this.metadata); - } - } - - /** - * Returns value of a key. - */ - public String getValue(String key) { - synchronized (this.metadata) { - return metadata.get(key); - } - } - - /** - * Deletes a metadata entry from the map. - * - * @param key - Key - */ - public void deleteKey(String key) { - synchronized (this.metadata) { - metadata.remove(key); - } - } - - /** - * Returns path. - * - * @return - path - */ - public String getDBPath() { - return dbPath; - } - - /** - * Sets path. - * - * @param path - String. - */ - public void setDBPath(String path) { - this.dbPath = path; - } - - /** - * This function serves as the generic key for ContainerCache class. Both - * ContainerData and ContainerKeyData overrides this function to appropriately - * return the right name that can be used in ContainerCache. - * - * @return String Name. - */ - public String getName() { - return getContainerName(); - } - - /** - * Get container file path. - * @return - Physical path where container file and checksum is stored. - */ - public String getContainerPath() { - return containerFilePath; - } - - /** - * Set container Path. - * @param containerPath - File path. - */ - public void setContainerPath(String containerPath) { - this.containerFilePath = containerPath; - } - - /** - * Get container ID. - * @return - container ID. - */ - public synchronized Long getContainerID() { - return containerID; - } - - public synchronized void setState(OzoneProtos.LifeCycleState state) { - this.state = state; - } - - public synchronized OzoneProtos.LifeCycleState getState() { - return this.state; - } - - /** - * checks if the container is open. - * @return - boolean - */ - public synchronized boolean isOpen() { - return OzoneProtos.LifeCycleState.OPEN == state; - } - - /** - * Marks this container as closed. - */ - public synchronized void closeContainer() { - // TODO: closed or closing here - setState(OzoneProtos.LifeCycleState.CLOSED); - - // Some thing brain dead for now. name + Time stamp of when we get the close - // container message. - setHash(DigestUtils.sha256Hex(this.getContainerName() + - Long.toString(Time.monotonicNow()))); - } - - /** - * Final hash for this container. - * @return - Hash - */ - public String getHash() { - return hash; - } - - public void setHash(String hash) { - this.hash = hash; - } - - public void setMaxSize(long maxSize) { - this.maxSize = maxSize; - } - - public long getMaxSize() { - return maxSize; - } - - public long getKeyCount() { - return metadata.size(); - } - - public void setBytesUsed(long used) { - this.bytesUsed.set(used); - } - - public long addBytesUsed(long delta) { - return this.bytesUsed.addAndGet(delta); - } - - public long getBytesUsed() { - return bytesUsed.get(); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
