http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java deleted file mode 100644 index 7c950dc..0000000 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.common.impl; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.container.common.helpers.ContainerData; -import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; -import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; -import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils; -import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.file.Files; -import java.nio.file.StandardCopyOption; -import java.security.NoSuchAlgorithmException; -import java.util.concurrent.ExecutionException; - -import static org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .Result.CONTAINER_INTERNAL_ERROR; -import static org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .Result.UNSUPPORTED_REQUEST; - -/** - * An implementation of ChunkManager that is used by default in ozone. - */ -public class ChunkManagerImpl implements ChunkManager { - static final Logger LOG = - LoggerFactory.getLogger(ChunkManagerImpl.class); - - private final ContainerManager containerManager; - - /** - * Constructs a ChunkManager. - * - * @param manager - ContainerManager. - */ - public ChunkManagerImpl(ContainerManager manager) { - this.containerManager = manager; - } - - /** - * writes a given chunk. - * - * @param pipeline - Name and the set of machines that make this container. - * @param keyName - Name of the Key. - * @param info - ChunkInfo. - * @throws StorageContainerException - */ - @Override - public void writeChunk(Pipeline pipeline, String keyName, ChunkInfo info, - byte[] data, ContainerProtos.Stage stage) - throws StorageContainerException { - // we don't want container manager to go away while we are writing chunks. - containerManager.readLock(); - - // TODO : Take keyManager Write lock here. - try { - Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); - String containerName = pipeline.getContainerName(); - Preconditions.checkNotNull(containerName, - "Container name cannot be null"); - ContainerData container = - containerManager.readContainer(containerName); - File chunkFile = ChunkUtils.validateChunk(pipeline, container, info); - File tmpChunkFile = getTmpChunkFile(chunkFile, info); - - LOG.debug("writing chunk:{} chunk stage:{} chunk file:{} tmp chunk file", - info.getChunkName(), stage, chunkFile, tmpChunkFile); - switch (stage) { - case WRITE_DATA: - ChunkUtils.writeData(tmpChunkFile, info, data); - break; - case COMMIT_DATA: - commitChunk(tmpChunkFile, chunkFile, containerName, info.getLen()); - break; - case COMBINED: - // directly write to the chunk file - long oldSize = chunkFile.length(); - ChunkUtils.writeData(chunkFile, info, data); - long newSize = chunkFile.length(); - containerManager.incrBytesUsed(containerName, newSize - oldSize); - containerManager.incrWriteCount(containerName); - containerManager.incrWriteBytes(containerName, info.getLen()); - break; - } - } catch (ExecutionException | NoSuchAlgorithmException | IOException e) { - LOG.error("write data failed. error: {}", e); - throw new StorageContainerException("Internal error: ", e, - CONTAINER_INTERNAL_ERROR); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.error("write data failed. error: {}", e); - throw new StorageContainerException("Internal error: ", e, - CONTAINER_INTERNAL_ERROR); - } finally { - containerManager.readUnlock(); - } - } - - // Create a temporary file in the same container directory - // in the format "<chunkname>.tmp" - private static File getTmpChunkFile(File chunkFile, ChunkInfo info) - throws StorageContainerException { - return new File(chunkFile.getParent(), - chunkFile.getName() + - OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER + - OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX); - } - - // Commit the chunk by renaming the temporary chunk file to chunk file - private void commitChunk(File tmpChunkFile, File chunkFile, - String containerName, long chunkLen) throws IOException { - long sizeDiff = tmpChunkFile.length() - chunkFile.length(); - // It is safe to replace here as the earlier chunk if existing should be - // caught as part of validateChunk - Files.move(tmpChunkFile.toPath(), chunkFile.toPath(), - StandardCopyOption.REPLACE_EXISTING); - containerManager.incrBytesUsed(containerName, sizeDiff); - containerManager.incrWriteCount(containerName); - containerManager.incrWriteBytes(containerName, chunkLen); - } - - /** - * reads the data defined by a chunk. - * - * @param pipeline - container pipeline. - * @param keyName - Name of the Key - * @param info - ChunkInfo. - * @return byte array - * @throws StorageContainerException - * TODO: Right now we do not support partial reads and writes of chunks. - * TODO: Explore if we need to do that for ozone. - */ - @Override - public byte[] readChunk(Pipeline pipeline, String keyName, ChunkInfo info) - throws StorageContainerException { - containerManager.readLock(); - try { - Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); - String containerName = pipeline.getContainerName(); - Preconditions.checkNotNull(containerName, - "Container name cannot be null"); - ContainerData container = - containerManager.readContainer(containerName); - File chunkFile = ChunkUtils.getChunkFile(pipeline, container, info); - ByteBuffer data = ChunkUtils.readData(chunkFile, info); - containerManager.incrReadCount(containerName); - containerManager.incrReadBytes(containerName, chunkFile.length()); - return data.array(); - } catch (ExecutionException | NoSuchAlgorithmException e) { - LOG.error("read data failed. error: {}", e); - throw new StorageContainerException("Internal error: ", - e, CONTAINER_INTERNAL_ERROR); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.error("read data failed. error: {}", e); - throw new StorageContainerException("Internal error: ", - e, CONTAINER_INTERNAL_ERROR); - } finally { - containerManager.readUnlock(); - } - } - - /** - * Deletes a given chunk. - * - * @param pipeline - Pipeline. - * @param keyName - Key Name - * @param info - Chunk Info - * @throws StorageContainerException - */ - @Override - public void deleteChunk(Pipeline pipeline, String keyName, ChunkInfo info) - throws StorageContainerException { - containerManager.readLock(); - try { - Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); - String containerName = pipeline.getContainerName(); - Preconditions.checkNotNull(containerName, - "Container name cannot be null"); - File chunkFile = ChunkUtils.getChunkFile(pipeline, containerManager - .readContainer(containerName), info); - if ((info.getOffset() == 0) && (info.getLen() == chunkFile.length())) { - FileUtil.fullyDelete(chunkFile); - containerManager.decrBytesUsed(containerName, chunkFile.length()); - } else { - LOG.error("Not Supported Operation. Trying to delete a " + - "chunk that is in shared file. chunk info : " + info.toString()); - throw new StorageContainerException("Not Supported Operation. " + - "Trying to delete a chunk that is in shared file. chunk info : " - + info.toString(), UNSUPPORTED_REQUEST); - } - } finally { - containerManager.readUnlock(); - } - } - - /** - * Shutdown the chunkManager. - * - * In the chunkManager we haven't acquired any resources, so nothing to do - * here. This call is made with containerManager Writelock held. - */ - @Override - public void shutdown() { - Preconditions.checkState(this.containerManager.hasWriteLock()); - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerLocationManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerLocationManagerImpl.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerLocationManagerImpl.java deleted file mode 100644 index b842493..0000000 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerLocationManagerImpl.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.common.impl; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.server.datanode.StorageLocation; -import org.apache.hadoop.metrics2.util.MBeans; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.container.common.interfaces - .ContainerLocationManager; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerLocationManagerMXBean; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.management.ObjectName; -import java.io.IOException; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.LinkedList; -import java.util.List; - -/** - * A class that tells the ContainerManager where to place the containers. - * Please note : There is *no* one-to-one correlation between metadata - * Locations and data Locations. - * - * For example : A user could map all container files to a - * SSD but leave data/metadata on bunch of other disks. - */ -public class ContainerLocationManagerImpl implements ContainerLocationManager, - ContainerLocationManagerMXBean { - private static final Logger LOG = - LoggerFactory.getLogger(ContainerLocationManagerImpl.class); - - private final List<ContainerStorageLocation> dataLocations; - private int currentIndex; - private final List<StorageLocation> metadataLocations; - private final ObjectName jmxbean; - - /** - * Constructs a Location Manager. - * @param metadataLocations - Refers to the metadataLocations - * where we store the container metadata. - * @param dataDirs - metadataLocations where we store the actual - * data or chunk files. - * @param conf - configuration. - * @throws IOException - */ - public ContainerLocationManagerImpl(List<StorageLocation> metadataLocations, - List<StorageLocation> dataDirs, Configuration conf) - throws IOException { - dataLocations = new LinkedList<>(); - for (StorageLocation dataDir : dataDirs) { - dataLocations.add(new ContainerStorageLocation(dataDir, conf)); - } - this.metadataLocations = metadataLocations; - jmxbean = MBeans.register("OzoneDataNode", - ContainerLocationManager.class.getSimpleName(), this); - } - - /** - * Returns the path where the container should be placed from a set of - * metadataLocations. - * - * @return A path where we should place this container and metadata. - * @throws IOException - */ - @Override - public Path getContainerPath() - throws IOException { - Preconditions.checkState(metadataLocations.size() > 0); - int index = currentIndex % metadataLocations.size(); - return Paths.get(metadataLocations.get(index).getNormalizedUri()); - } - - /** - * Returns the path where the container Data file are stored. - * - * @return a path where we place the LevelDB and data files of a container. - * @throws IOException - */ - @Override - public Path getDataPath(String containerName) throws IOException { - Path currentPath = Paths.get( - dataLocations.get(currentIndex++ % dataLocations.size()) - .getNormalizedUri()); - currentPath = currentPath.resolve(OzoneConsts.CONTAINER_PREFIX); - return currentPath.resolve(containerName); - } - - @Override - public StorageLocationReport[] getLocationReport() throws IOException { - StorageLocationReport[] reports = - new StorageLocationReport[dataLocations.size()]; - for (int idx = 0; idx < dataLocations.size(); idx++) { - ContainerStorageLocation loc = dataLocations.get(idx); - long scmUsed = 0; - long remaining = 0; - try { - scmUsed = loc.getScmUsed(); - remaining = loc.getAvailable(); - } catch (IOException ex) { - LOG.warn("Failed to get scmUsed and remaining for container " + - "storage location {}", loc.getNormalizedUri()); - // reset scmUsed and remaining if df/du failed. - scmUsed = 0; - remaining = 0; - } - - // TODO: handle failed storage - // For now, include storage report for location that failed to get df/du. - StorageLocationReport r = new StorageLocationReport( - loc.getStorageUuId(), false, loc.getCapacity(), - scmUsed, remaining); - reports[idx] = r; - } - return reports; - } - - /** - * Supports clean shutdown of container location du threads. - * - * @throws IOException - */ - @Override - public void shutdown() throws IOException { - for (ContainerStorageLocation loc: dataLocations) { - loc.shutdown(); - } - MBeans.unregister(jmxbean); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java deleted file mode 100644 index 35a2b64..0000000 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java +++ /dev/null @@ -1,1101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.common.impl; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdsl.protocol.DatanodeDetails; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState; -import org.apache.hadoop.hdsl.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; -import org.apache.hadoop.hdsl.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMStorageReport; -import org.apache.hadoop.hdfs.server.datanode.StorageLocation; -import org.apache.hadoop.ozone.container.common.helpers.KeyData; -import org.apache.hadoop.ozone.container.common.helpers.KeyUtils; -import org.apache.hadoop.ozone.container.common.interfaces.*; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; -import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.container.common.helpers.ContainerData; -import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; -import org.apache.hadoop.scm.ScmConfigKeys; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.utils.MetadataKeyFilters; -import org.apache.hadoop.utils.MetadataStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.FilenameFilter; -import java.io.IOException; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.security.DigestInputStream; -import java.security.DigestOutputStream; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; - -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; -import static org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .Result.CONTAINER_EXISTS; -import static org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .Result.CONTAINER_INTERNAL_ERROR; -import static org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .Result.CONTAINER_NOT_FOUND; -import static org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .Result.INVALID_CONFIG; -import static org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .Result.IO_EXCEPTION; -import static org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .Result.NO_SUCH_ALGORITHM; -import static org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .Result.UNABLE_TO_READ_METADATA_DB; -import static org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .Result.UNSUPPORTED_REQUEST; -import static org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .Result.ERROR_IN_COMPACT_DB; -import static org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .Result.UNCLOSED_CONTAINER_IO; -import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_EXTENSION; -import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_META; - -/** - * A Generic ContainerManagerImpl that will be called from Ozone - * ContainerManagerImpl. This allows us to support delta changes to ozone - * version without having to rewrite the containerManager. - */ -public class ContainerManagerImpl implements ContainerManager { - static final Logger LOG = - LoggerFactory.getLogger(ContainerManagerImpl.class); - - private final ConcurrentSkipListMap<String, ContainerStatus> - containerMap = new ConcurrentSkipListMap<>(); - - // Use a non-fair RW lock for better throughput, we may revisit this decision - // if this causes fairness issues. - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private ContainerLocationManager locationManager; - private ChunkManager chunkManager; - private KeyManager keyManager; - private Configuration conf; - private DatanodeDetails datanodeDetails; - - private ContainerDeletionChoosingPolicy containerDeletionChooser; - private ContainerReportManager containerReportManager; - - /** - * Init call that sets up a container Manager. - * - * @param config - Configuration. - * @param containerDirs - List of Metadata Container locations. - * @param dnDetails - DatanodeDetails. - * @throws IOException - */ - @Override - public void init( - Configuration config, List<StorageLocation> containerDirs, - DatanodeDetails dnDetails) throws IOException { - Preconditions.checkNotNull(config, "Config must not be null"); - Preconditions.checkNotNull(containerDirs, "Container directories cannot " + - "be null"); - Preconditions.checkNotNull(dnDetails, "Datanode Details cannot " + - "be null"); - - Preconditions.checkState(containerDirs.size() > 0, "Number of container" + - " directories must be greater than zero."); - - this.conf = config; - this.datanodeDetails = dnDetails; - - readLock(); - try { - containerDeletionChooser = ReflectionUtils.newInstance(conf.getClass( - ScmConfigKeys.OZONE_SCM_CONTAINER_DELETION_CHOOSING_POLICY, - TopNOrderedContainerDeletionChoosingPolicy.class, - ContainerDeletionChoosingPolicy.class), conf); - - for (StorageLocation path : containerDirs) { - File directory = Paths.get(path.getNormalizedUri()).toFile(); - if (!directory.exists() && !directory.mkdirs()) { - LOG.error("Container metadata directory doesn't exist " - + "and cannot be created. Path: {}", path.toString()); - throw new StorageContainerException("Container metadata " - + "directory doesn't exist and cannot be created " + path - .toString(), INVALID_CONFIG); - } - - // TODO: This will fail if any directory is invalid. - // We should fix this to handle invalid directories and continue. - // Leaving it this way to fail fast for time being. - if (!directory.isDirectory()) { - LOG.error("Invalid path to container metadata directory. path: {}", - path.toString()); - throw new StorageContainerException("Invalid path to container " + - "metadata directory." + path, INVALID_CONFIG); - } - LOG.info("Loading containers under {}", path); - File[] files = directory.listFiles(new ContainerFilter()); - if (files != null) { - for (File containerFile : files) { - LOG.debug("Loading container {}", containerFile); - String containerPath = - ContainerUtils.getContainerNameFromFile(containerFile); - Preconditions.checkNotNull(containerPath, "Container path cannot" + - " be null"); - readContainerInfo(containerPath); - } - } - } - - List<StorageLocation> dataDirs = new LinkedList<>(); - for (String dir : config.getStrings(DFS_DATANODE_DATA_DIR_KEY)) { - StorageLocation location = StorageLocation.parse(dir); - dataDirs.add(location); - } - this.locationManager = - new ContainerLocationManagerImpl(containerDirs, dataDirs, config); - - this.containerReportManager = - new ContainerReportManagerImpl(config); - } finally { - readUnlock(); - } - } - - /** - * Reads the Container Info from a file and verifies that checksum match. If - * the checksums match, then that file is added to containerMap. - * - * @param containerName - Name which points to the persisted container. - * @throws StorageContainerException - */ - private void readContainerInfo(String containerName) - throws StorageContainerException { - Preconditions.checkState(containerName.length() > 0, - "Container name length cannot be zero."); - FileInputStream containerStream = null; - DigestInputStream dis = null; - FileInputStream metaStream = null; - Path cPath = Paths.get(containerName).getFileName(); - String keyName = null; - if (cPath != null) { - keyName = cPath.toString(); - } - Preconditions.checkNotNull(keyName, - "Container Name to container key mapping is null"); - - try { - String containerFileName = containerName.concat(CONTAINER_EXTENSION); - String metaFileName = containerName.concat(CONTAINER_META); - - containerStream = new FileInputStream(containerFileName); - - metaStream = new FileInputStream(metaFileName); - - MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); - - dis = new DigestInputStream(containerStream, sha); - - ContainerProtos.ContainerData containerDataProto = - ContainerProtos.ContainerData.parseDelimitedFrom(dis); - ContainerData containerData; - if (containerDataProto == null) { - // Sometimes container metadata might have been created but empty, - // when loading the info we get a null, this often means last time - // SCM was ending up at some middle phase causing that the metadata - // was not populated. Such containers are marked as inactive. - containerMap.put(keyName, new ContainerStatus(null)); - return; - } - containerData = ContainerData.getFromProtBuf(containerDataProto, conf); - ContainerProtos.ContainerMeta meta = - ContainerProtos.ContainerMeta.parseDelimitedFrom(metaStream); - if (meta != null && !DigestUtils.sha256Hex(sha.digest()) - .equals(meta.getHash())) { - // This means we were not able read data from the disk when booted the - // datanode. We are going to rely on SCM understanding that we don't - // have valid data for this container when we send container reports. - // Hopefully SCM will ask us to delete this container and rebuild it. - LOG.error("Invalid SHA found for container data. Name :{}" - + "cowardly refusing to read invalid data", containerName); - containerMap.put(keyName, new ContainerStatus(null)); - return; - } - - ContainerStatus containerStatus = new ContainerStatus(containerData); - // Initialize pending deletion blocks count in in-memory - // container status. - MetadataStore metadata = KeyUtils.getDB(containerData, conf); - List<Map.Entry<byte[], byte[]>> underDeletionBlocks = metadata - .getSequentialRangeKVs(null, Integer.MAX_VALUE, - MetadataKeyFilters.getDeletingKeyFilter()); - containerStatus.incrPendingDeletionBlocks(underDeletionBlocks.size()); - - List<Map.Entry<byte[], byte[]>> liveKeys = metadata - .getRangeKVs(null, Integer.MAX_VALUE, - MetadataKeyFilters.getNormalKeyFilter()); - - // Get container bytesUsed upon loading container - // The in-memory state is updated upon key write or delete - // TODO: update containerDataProto and persist it into container MetaFile - long bytesUsed = 0; - bytesUsed = liveKeys.parallelStream().mapToLong(e-> { - KeyData keyData; - try { - keyData = KeyUtils.getKeyData(e.getValue()); - return keyData.getSize(); - } catch (IOException ex) { - return 0L; - } - }).sum(); - containerStatus.setBytesUsed(bytesUsed); - - containerMap.put(keyName, containerStatus); - } catch (IOException | NoSuchAlgorithmException ex) { - LOG.error("read failed for file: {} ex: {}", containerName, - ex.getMessage()); - - // TODO : Add this file to a recovery Queue. - - // Remember that this container is busted and we cannot use it. - containerMap.put(keyName, new ContainerStatus(null)); - throw new StorageContainerException("Unable to read container info", - UNABLE_TO_READ_METADATA_DB); - } finally { - IOUtils.closeStream(dis); - IOUtils.closeStream(containerStream); - IOUtils.closeStream(metaStream); - } - } - - /** - * Creates a container with the given name. - * - * @param pipeline -- Nodes which make up this container. - * @param containerData - Container Name and metadata. - * @throws StorageContainerException - Exception - */ - @Override - public void createContainer(Pipeline pipeline, ContainerData containerData) - throws StorageContainerException { - Preconditions.checkNotNull(containerData, "Container data cannot be null"); - writeLock(); - try { - if (containerMap.containsKey(containerData.getName())) { - LOG.debug("container already exists. {}", containerData.getName()); - throw new StorageContainerException("container already exists.", - CONTAINER_EXISTS); - } - - // This is by design. We first write and close the - // container Info and metadata to a directory. - // Then read back and put that info into the containerMap. - // This allows us to make sure that our write is consistent. - - writeContainerInfo(containerData, false); - File cFile = new File(containerData.getContainerPath()); - readContainerInfo(ContainerUtils.getContainerNameFromFile(cFile)); - } catch (NoSuchAlgorithmException ex) { - LOG.error("Internal error: We seem to be running a JVM without a " + - "needed hash algorithm."); - throw new StorageContainerException("failed to create container", - NO_SUCH_ALGORITHM); - } finally { - writeUnlock(); - } - - } - - /** - * Writes a container to a chosen location and updates the container Map. - * - * The file formats of ContainerData and Container Meta is the following. - * - * message ContainerData { - * required string name = 1; - * repeated KeyValue metadata = 2; - * optional string dbPath = 3; - * optional string containerPath = 4; - * optional int64 bytesUsed = 5; - * optional int64 size = 6; - * } - * - * message ContainerMeta { - * required string fileName = 1; - * required string hash = 2; - * } - * - * @param containerData - container Data - * @param overwrite - Whether we are overwriting. - * @throws StorageContainerException, NoSuchAlgorithmException - */ - private void writeContainerInfo(ContainerData containerData, - boolean overwrite) - throws StorageContainerException, NoSuchAlgorithmException { - - Preconditions.checkNotNull(this.locationManager, - "Internal error: location manager cannot be null"); - - FileOutputStream containerStream = null; - DigestOutputStream dos = null; - FileOutputStream metaStream = null; - - try { - Path metadataPath = null; - Path location = (!overwrite) ? locationManager.getContainerPath(): - Paths.get(containerData.getContainerPath()).getParent(); - if (location == null) { - throw new StorageContainerException( - "Failed to get container file path.", - CONTAINER_INTERNAL_ERROR); - } - - File containerFile = ContainerUtils.getContainerFile(containerData, - location); - File metadataFile = ContainerUtils.getMetadataFile(containerData, - location); - String containerName = containerData.getContainerName(); - - if(!overwrite) { - ContainerUtils.verifyIsNewContainer(containerFile, metadataFile); - metadataPath = this.locationManager.getDataPath(containerName); - metadataPath = ContainerUtils.createMetadata(metadataPath, - containerName, conf); - } else { - metadataPath = ContainerUtils.getMetadataDirectory(containerData); - } - - containerStream = new FileOutputStream(containerFile); - metaStream = new FileOutputStream(metadataFile); - MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); - - dos = new DigestOutputStream(containerStream, sha); - containerData.setDBPath(metadataPath.resolve( - ContainerUtils.getContainerDbFileName(containerName)) - .toString()); - containerData.setContainerPath(containerFile.toString()); - - ContainerProtos.ContainerData protoData = containerData - .getProtoBufMessage(); - protoData.writeDelimitedTo(dos); - - ContainerProtos.ContainerMeta protoMeta = ContainerProtos - .ContainerMeta.newBuilder() - .setFileName(containerFile.toString()) - .setHash(DigestUtils.sha256Hex(sha.digest())) - .build(); - protoMeta.writeDelimitedTo(metaStream); - - } catch (IOException ex) { - // TODO : we need to clean up partially constructed files - // The proper way to do would be for a thread - // to read all these 3 artifacts and make sure they are - // sane. That info needs to come from the replication - // pipeline, and if not consistent delete these file. - - // In case of ozone this is *not* a deal breaker since - // SCM is guaranteed to generate unique container names. - // The saving grace is that we check if we have residue files - // lying around when creating a new container. We need to queue - // this information to a cleaner thread. - - LOG.error("Creation of container failed. Name: {}, we might need to " + - "cleanup partially created artifacts. ", - containerData.getContainerName(), ex); - throw new StorageContainerException("Container creation failed. ", - ex, CONTAINER_INTERNAL_ERROR); - } finally { - IOUtils.closeStream(dos); - IOUtils.closeStream(containerStream); - IOUtils.closeStream(metaStream); - } - } - - /** - * Deletes an existing container. - * - * @param pipeline - nodes that make this container. - * @param containerName - name of the container. - * @param forceDelete - whether this container should be deleted forcibly. - * @throws StorageContainerException - */ - @Override - public void deleteContainer(Pipeline pipeline, String containerName, - boolean forceDelete) throws StorageContainerException { - Preconditions.checkNotNull(containerName, "Container name cannot be null"); - Preconditions.checkState(containerName.length() > 0, - "Container name length cannot be zero."); - writeLock(); - try { - if (isOpen(pipeline.getContainerName())) { - throw new StorageContainerException( - "Deleting an open container is not allowed.", - UNCLOSED_CONTAINER_IO); - } - - ContainerStatus status = containerMap.get(containerName); - if (status == null) { - LOG.debug("No such container. Name: {}", containerName); - throw new StorageContainerException("No such container. Name : " + - containerName, CONTAINER_NOT_FOUND); - } - if (status.getContainer() == null) { - LOG.debug("Invalid container data. Name: {}", containerName); - throw new StorageContainerException("Invalid container data. Name : " + - containerName, CONTAINER_NOT_FOUND); - } - ContainerUtils.removeContainer(status.getContainer(), conf, forceDelete); - containerMap.remove(containerName); - } catch (StorageContainerException e) { - throw e; - } catch (IOException e) { - // TODO : An I/O error during delete can leave partial artifacts on the - // disk. We will need the cleaner thread to cleanup this information. - LOG.error("Failed to cleanup container. Name: {}", containerName, e); - throw new StorageContainerException(containerName, e, IO_EXCEPTION); - } finally { - writeUnlock(); - } - } - - /** - * A simple interface for container Iterations. - * <p/> - * This call make no guarantees about consistency of the data between - * different list calls. It just returns the best known data at that point of - * time. It is possible that using this iteration you can miss certain - * container from the listing. - * - * @param prefix - Return keys that match this prefix. - * @param count - how many to return - * @param prevKey - Previous Key Value or empty String. - * @param data - Actual containerData - * @throws StorageContainerException - */ - @Override - public void listContainer(String prefix, long count, String prevKey, - List<ContainerData> data) throws StorageContainerException { - // TODO : Support list with Prefix and PrevKey - Preconditions.checkNotNull(data, - "Internal assertion: data cannot be null"); - readLock(); - try { - ConcurrentNavigableMap<String, ContainerStatus> map; - if (prevKey == null || prevKey.isEmpty()) { - map = containerMap.tailMap(containerMap.firstKey(), true); - } else { - map = containerMap.tailMap(prevKey, false); - } - - int currentCount = 0; - for (ContainerStatus entry : map.values()) { - if (currentCount < count) { - data.add(entry.getContainer()); - currentCount++; - } else { - return; - } - } - } finally { - readUnlock(); - } - } - - /** - * Get metadata about a specific container. - * - * @param containerName - Name of the container - * @return ContainerData - Container Data. - * @throws StorageContainerException - */ - @Override - public ContainerData readContainer(String containerName) throws - StorageContainerException { - Preconditions.checkNotNull(containerName, "Container name cannot be null"); - Preconditions.checkState(containerName.length() > 0, - "Container name length cannot be zero."); - if (!containerMap.containsKey(containerName)) { - throw new StorageContainerException("Unable to find the container. Name: " - + containerName, CONTAINER_NOT_FOUND); - } - ContainerData cData = containerMap.get(containerName).getContainer(); - if (cData == null) { - throw new StorageContainerException("Invalid container data. Name: " - + containerName, CONTAINER_INTERNAL_ERROR); - } - return cData; - } - - /** - * Closes a open container, if it is already closed or does not exist a - * StorageContainerException is thrown. - * - * @param containerName - Name of the container. - * @throws StorageContainerException - */ - @Override - public void closeContainer(String containerName) - throws StorageContainerException, NoSuchAlgorithmException { - ContainerData containerData = readContainer(containerName); - containerData.closeContainer(); - writeContainerInfo(containerData, true); - MetadataStore db = KeyUtils.getDB(containerData, conf); - - // It is ok if this operation takes a bit of time. - // Close container is not expected to be instantaneous. - try { - db.compactDB(); - } catch (IOException e) { - LOG.error("Error in DB compaction while closing container", e); - throw new StorageContainerException(e, ERROR_IN_COMPACT_DB); - } - - // Active is different from closed. Closed means it is immutable, active - // false means we have some internal error that is happening to this - // container. This is a way to track damaged containers if we have an - // I/O failure, this allows us to take quick action in case of container - // issues. - - ContainerStatus status = new ContainerStatus(containerData); - containerMap.put(containerName, status); - } - - @Override - public void updateContainer(Pipeline pipeline, String containerName, - ContainerData data, boolean forceUpdate) - throws StorageContainerException { - Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); - Preconditions.checkNotNull(containerName, "Container name cannot be null"); - Preconditions.checkNotNull(data, "Container data cannot be null"); - FileOutputStream containerStream = null; - DigestOutputStream dos = null; - MessageDigest sha = null; - File containerFileBK = null, containerFile = null; - boolean deleted = false; - - if(!containerMap.containsKey(containerName)) { - throw new StorageContainerException("Container doesn't exist. Name :" - + containerName, CONTAINER_NOT_FOUND); - } - - try { - sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); - } catch (NoSuchAlgorithmException e) { - throw new StorageContainerException("Unable to create Message Digest," - + " usually this is a java configuration issue.", - NO_SUCH_ALGORITHM); - } - - try { - Path location = locationManager.getContainerPath(); - ContainerData orgData = containerMap.get(containerName).getContainer(); - if (orgData == null) { - // updating a invalid container - throw new StorageContainerException("Update a container with invalid" + - "container meta data", CONTAINER_INTERNAL_ERROR); - } - - if (!forceUpdate && !orgData.isOpen()) { - throw new StorageContainerException( - "Update a closed container is not allowed. Name: " + containerName, - UNSUPPORTED_REQUEST); - } - - containerFile = ContainerUtils.getContainerFile(orgData, location); - // If forceUpdate is true, there is no need to check - // whether the container file exists. - if (!forceUpdate) { - if (!containerFile.exists() || !containerFile.canWrite()) { - throw new StorageContainerException( - "Container file not exists or corrupted. Name: " + containerName, - CONTAINER_INTERNAL_ERROR); - } - - // Backup the container file - containerFileBK = File.createTempFile( - "tmp_" + System.currentTimeMillis() + "_", - containerFile.getName(), containerFile.getParentFile()); - FileUtils.copyFile(containerFile, containerFileBK); - - deleted = containerFile.delete(); - containerStream = new FileOutputStream(containerFile); - dos = new DigestOutputStream(containerStream, sha); - - ContainerProtos.ContainerData protoData = data.getProtoBufMessage(); - protoData.writeDelimitedTo(dos); - } - - // Update the in-memory map - ContainerStatus newStatus = new ContainerStatus(data); - containerMap.replace(containerName, newStatus); - } catch (IOException e) { - // Restore the container file from backup - if(containerFileBK != null && containerFileBK.exists() && deleted) { - if(containerFile.delete() - && containerFileBK.renameTo(containerFile)) { - throw new StorageContainerException("Container update failed," - + " container data restored from the backup.", - CONTAINER_INTERNAL_ERROR); - } else { - throw new StorageContainerException( - "Failed to restore container data from the backup. Name: " - + containerName, CONTAINER_INTERNAL_ERROR); - } - } else { - throw new StorageContainerException( - e.getMessage(), CONTAINER_INTERNAL_ERROR); - } - } finally { - if (containerFileBK != null && containerFileBK.exists()) { - if(!containerFileBK.delete()) { - LOG.warn("Unable to delete container file backup : {}.", - containerFileBK.getAbsolutePath()); - } - } - IOUtils.closeStream(dos); - IOUtils.closeStream(containerStream); - } - } - - @VisibleForTesting - protected File getContainerFile(ContainerData data) throws IOException { - return ContainerUtils.getContainerFile(data, - this.locationManager.getContainerPath()); - } - - /** - * Checks if a container exists. - * - * @param containerName - Name of the container. - * @return true if the container is open false otherwise. - * @throws StorageContainerException - Throws Exception if we are not able to - * find the container. - */ - @Override - public boolean isOpen(String containerName) throws StorageContainerException { - final ContainerStatus status = containerMap.get(containerName); - if (status == null) { - throw new StorageContainerException( - "Container status not found: " + containerName, CONTAINER_NOT_FOUND); - } - final ContainerData cData = status.getContainer(); - if (cData == null) { - throw new StorageContainerException( - "Container not found: " + containerName, CONTAINER_NOT_FOUND); - } - return cData.isOpen(); - } - - /** - * Supports clean shutdown of container. - * - * @throws IOException - */ - @Override - public void shutdown() throws IOException { - Preconditions.checkState(this.hasWriteLock(), - "Assumption that we are holding the lock violated."); - this.containerMap.clear(); - this.locationManager.shutdown(); - } - - - @VisibleForTesting - public ConcurrentSkipListMap<String, ContainerStatus> getContainerMap() { - return containerMap; - } - - /** - * Acquire read lock. - */ - @Override - public void readLock() { - this.lock.readLock().lock(); - - } - - @Override - public void readLockInterruptibly() throws InterruptedException { - this.lock.readLock().lockInterruptibly(); - } - - /** - * Release read lock. - */ - @Override - public void readUnlock() { - this.lock.readLock().unlock(); - } - - /** - * Check if the current thread holds read lock. - */ - @Override - public boolean hasReadLock() { - return this.lock.readLock().tryLock(); - } - - /** - * Acquire write lock. - */ - @Override - public void writeLock() { - this.lock.writeLock().lock(); - } - - /** - * Acquire write lock, unless interrupted while waiting. - */ - @Override - public void writeLockInterruptibly() throws InterruptedException { - this.lock.writeLock().lockInterruptibly(); - - } - - /** - * Release write lock. - */ - @Override - public void writeUnlock() { - this.lock.writeLock().unlock(); - - } - - /** - * Check if the current thread holds write lock. - */ - @Override - public boolean hasWriteLock() { - return this.lock.writeLock().isHeldByCurrentThread(); - } - - public ChunkManager getChunkManager() { - return this.chunkManager; - } - - /** - * Sets the chunk Manager. - * - * @param chunkManager - Chunk Manager - */ - public void setChunkManager(ChunkManager chunkManager) { - this.chunkManager = chunkManager; - } - - /** - * Gets the Key Manager. - * - * @return KeyManager. - */ - @Override - public KeyManager getKeyManager() { - return this.keyManager; - } - - /** - * Get the node report. - * @return node report. - */ - @Override - public SCMNodeReport getNodeReport() throws IOException { - StorageLocationReport[] reports = locationManager.getLocationReport(); - SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder(); - for (int i = 0; i < reports.length; i++) { - SCMStorageReport.Builder srb = SCMStorageReport.newBuilder(); - nrb.addStorageReport(i, srb.setStorageUuid(reports[i].getId()) - .setCapacity(reports[i].getCapacity()) - .setScmUsed(reports[i].getScmUsed()) - .setRemaining(reports[i].getRemaining()) - .build()); - } - return nrb.build(); - } - - - /** - * Gets container reports. - * - * @return List of all closed containers. - * @throws IOException - */ - @Override - public List<ContainerData> getContainerReports() throws IOException { - LOG.debug("Starting container report iteration."); - // No need for locking since containerMap is a ConcurrentSkipListMap - // And we can never get the exact state since close might happen - // after we iterate a point. - return containerMap.entrySet().stream() - .filter(containerStatus -> - !containerStatus.getValue().getContainer().isOpen()) - .map(containerStatus -> containerStatus.getValue().getContainer()) - .collect(Collectors.toList()); - } - - /** - * Get container report. - * - * @return The container report. - * @throws IOException - */ - @Override - public ContainerReportsRequestProto getContainerReport() throws IOException { - LOG.debug("Starting container report iteration."); - // No need for locking since containerMap is a ConcurrentSkipListMap - // And we can never get the exact state since close might happen - // after we iterate a point. - List<ContainerStatus> containers = containerMap.values().stream() - .collect(Collectors.toList()); - - ContainerReportsRequestProto.Builder crBuilder = - ContainerReportsRequestProto.newBuilder(); - - // TODO: support delta based container report - crBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage()) - .setType(ContainerReportsRequestProto.reportType.fullReport); - - for (ContainerStatus container: containers) { - StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder = - StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder(); - ciBuilder.setContainerName(container.getContainer().getContainerName()) - .setSize(container.getContainer().getMaxSize()) - .setUsed(container.getContainer().getBytesUsed()) - .setKeyCount(container.getContainer().getKeyCount()) - .setReadCount(container.getReadCount()) - .setWriteCount(container.getWriteCount()) - .setReadBytes(container.getReadBytes()) - .setWriteBytes(container.getWriteBytes()) - .setContainerID(container.getContainer().getContainerID()); - - if (container.getContainer().getHash() != null) { - ciBuilder.setFinalhash(container.getContainer().getHash()); - } - crBuilder.addReports(ciBuilder.build()); - } - - return crBuilder.build(); - } - - /** - * Sets the Key Manager. - * - * @param keyManager - Key Manager. - */ - @Override - public void setKeyManager(KeyManager keyManager) { - this.keyManager = keyManager; - } - - /** - * Filter out only container files from the container metadata dir. - */ - private static class ContainerFilter implements FilenameFilter { - /** - * Tests if a specified file should be included in a file list. - * - * @param dir the directory in which the file was found. - * @param name the name of the file. - * @return <code>true</code> if and only if the name should be included in - * the file list; <code>false</code> otherwise. - */ - @Override - public boolean accept(File dir, String name) { - return name.endsWith(CONTAINER_EXTENSION); - } - } - - @Override - public List<ContainerData> chooseContainerForBlockDeletion( - int count) throws StorageContainerException { - readLock(); - try { - return containerDeletionChooser.chooseContainerForBlockDeletion( - count, containerMap); - } finally { - readUnlock(); - } - } - - @VisibleForTesting - public ContainerDeletionChoosingPolicy getContainerDeletionChooser() { - return containerDeletionChooser; - } - - @Override - public void incrPendingDeletionBlocks(int numBlocks, String containerId) { - writeLock(); - try { - ContainerStatus status = containerMap.get(containerId); - status.incrPendingDeletionBlocks(numBlocks); - } finally { - writeUnlock(); - } - } - - @Override - public void decrPendingDeletionBlocks(int numBlocks, String containerId) { - writeLock(); - try { - ContainerStatus status = containerMap.get(containerId); - status.decrPendingDeletionBlocks(numBlocks); - } finally { - writeUnlock(); - } - } - - /** - * Increase the read count of the container. - * - * @param containerName - Name of the container. - */ - @Override - public void incrReadCount(String containerName) { - ContainerStatus status = containerMap.get(containerName); - status.incrReadCount(); - } - - public long getReadCount(String containerName) { - ContainerStatus status = containerMap.get(containerName); - return status.getReadCount(); - } - - /** - * Increse the read counter for bytes read from the container. - * - * @param containerName - Name of the container. - * @param readBytes - bytes read from the container. - */ - @Override - public void incrReadBytes(String containerName, long readBytes) { - ContainerStatus status = containerMap.get(containerName); - status.incrReadBytes(readBytes); - } - - public long getReadBytes(String containerName) { - readLock(); - try { - ContainerStatus status = containerMap.get(containerName); - return status.getReadBytes(); - } finally { - readUnlock(); - } - } - - /** - * Increase the write count of the container. - * - * @param containerName - Name of the container. - */ - @Override - public void incrWriteCount(String containerName) { - ContainerStatus status = containerMap.get(containerName); - status.incrWriteCount(); - } - - public long getWriteCount(String containerName) { - ContainerStatus status = containerMap.get(containerName); - return status.getWriteCount(); - } - - /** - * Increse the write counter for bytes write into the container. - * - * @param containerName - Name of the container. - * @param writeBytes - bytes write into the container. - */ - @Override - public void incrWriteBytes(String containerName, long writeBytes) { - ContainerStatus status = containerMap.get(containerName); - status.incrWriteBytes(writeBytes); - } - - public long getWriteBytes(String containerName) { - ContainerStatus status = containerMap.get(containerName); - return status.getWriteBytes(); - } - - /** - * Increase the bytes used by the container. - * - * @param containerName - Name of the container. - * @param used - additional bytes used by the container. - * @return the current bytes used. - */ - @Override - public long incrBytesUsed(String containerName, long used) { - ContainerStatus status = containerMap.get(containerName); - return status.incrBytesUsed(used); - } - - /** - * Decrease the bytes used by the container. - * - * @param containerName - Name of the container. - * @param used - additional bytes reclaimed by the container. - * @return the current bytes used. - */ - @Override - public long decrBytesUsed(String containerName, long used) { - ContainerStatus status = containerMap.get(containerName); - return status.decrBytesUsed(used); - } - - public long getBytesUsed(String containerName) { - ContainerStatus status = containerMap.get(containerName); - return status.getBytesUsed(); - } - - /** - * Get the number of keys in the container. - * - * @param containerName - Name of the container. - * @return the current key count. - */ - @Override - public long getNumKeys(String containerName) { - ContainerStatus status = containerMap.get(containerName); - return status.getNumKeys(); } - - /** - * Get the container report state to send via HB to SCM. - * - * @return container report state. - */ - @Override - public ReportState getContainerReportState() { - return containerReportManager.getContainerReportState(); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerReportManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerReportManagerImpl.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerReportManagerImpl.java deleted file mode 100644 index a300767..0000000 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerReportManagerImpl.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.common.impl; - -import org.apache.commons.lang3.RandomUtils; -import static org.apache.hadoop.ozone.scm.HdslServerUtil - .getScmHeartbeatInterval; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerReportManager; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState; -import org.apache.hadoop.util.Time; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -/** - * Class wraps the container report operations on datanode. - * // TODO: support incremental/delta container report - */ -public class ContainerReportManagerImpl implements ContainerReportManager { - // Last non-empty container report time - private long lastContainerReportTime; - private final long containerReportInterval; - private final long heartbeatInterval; - private AtomicLong reportCount; - private static final ReportState NO_CONTAINER_REPORTSTATE = - ReportState.newBuilder() - .setState(ReportState.states.noContainerReports) - .setCount(0).build(); - - public ContainerReportManagerImpl(Configuration config) { - this.lastContainerReportTime = -1; - this.reportCount = new AtomicLong(0L); - this.containerReportInterval = config.getTimeDuration( - OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL, - OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL_DEFAULT, - TimeUnit.MILLISECONDS); - this.heartbeatInterval = getScmHeartbeatInterval(config); - } - - public ReportState getContainerReportState() { - if (lastContainerReportTime < 0) { - return getFullContainerReportState(); - } else { - // Add a random delay (0~30s) on top of the container report - // interval (60s) so tha the SCM is overwhelmed by the container reports - // sent in sync. - if (Time.monotonicNow() - lastContainerReportTime > - (containerReportInterval + getRandomReportDelay())) { - return getFullContainerReportState(); - } else { - return getNoContainerReportState(); - } - } - } - - private ReportState getFullContainerReportState() { - ReportState.Builder rsBuilder = ReportState.newBuilder(); - rsBuilder.setState(ReportState.states.completeContinerReport); - rsBuilder.setCount(reportCount.incrementAndGet()); - this.lastContainerReportTime = Time.monotonicNow(); - return rsBuilder.build(); - } - - private ReportState getNoContainerReportState() { - return NO_CONTAINER_REPORTSTATE; - } - - private long getRandomReportDelay() { - return RandomUtils.nextLong(0, heartbeatInterval); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStatus.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStatus.java deleted file mode 100644 index 5577323..0000000 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStatus.java +++ /dev/null @@ -1,217 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.common.impl; - -import org.apache.hadoop.ozone.container.common.helpers.ContainerData; - -import java.util.concurrent.atomic.AtomicLong; - -/** - * This class represents the state of a container. if the - * container reading encountered an error when we boot up we will post that - * info to a recovery queue and keep the info in the containerMap. - * <p/> - * if and when the issue is fixed, the expectation is that this entry will be - * deleted by the recovery thread from the containerMap and will insert entry - * instead of modifying this class. - */ -public class ContainerStatus { - private final ContainerData containerData; - - /** - * Number of pending deletion blocks in container. - */ - private int numPendingDeletionBlocks; - - private AtomicLong readBytes; - - private AtomicLong writeBytes; - - private AtomicLong readCount; - - private AtomicLong writeCount; - - /** - * Creates a Container Status class. - * - * @param containerData - ContainerData. - */ - ContainerStatus(ContainerData containerData) { - this.numPendingDeletionBlocks = 0; - this.containerData = containerData; - this.readCount = new AtomicLong(0L); - this.readBytes = new AtomicLong(0L); - this.writeCount = new AtomicLong(0L); - this.writeBytes = new AtomicLong(0L); - } - - /** - * Returns container if it is active. It is not active if we have had an - * error and we are waiting for the background threads to fix the issue. - * - * @return ContainerData. - */ - public ContainerData getContainer() { - return containerData; - } - - /** - * Increase the count of pending deletion blocks. - * - * @param numBlocks increment number - */ - public void incrPendingDeletionBlocks(int numBlocks) { - this.numPendingDeletionBlocks += numBlocks; - } - - /** - * Decrease the count of pending deletion blocks. - * - * @param numBlocks decrement number - */ - public void decrPendingDeletionBlocks(int numBlocks) { - this.numPendingDeletionBlocks -= numBlocks; - } - - /** - * Get the number of pending deletion blocks. - */ - public int getNumPendingDeletionBlocks() { - return this.numPendingDeletionBlocks; - } - - /** - * Get the number of bytes read from the container. - * @return the number of bytes read from the container. - */ - public long getReadBytes() { - return readBytes.get(); - } - - /** - * Increase the number of bytes read from the container. - * @param bytes number of bytes read. - */ - public void incrReadBytes(long bytes) { - this.readBytes.addAndGet(bytes); - } - - /** - * Get the number of times the container is read. - * @return the number of times the container is read. - */ - public long getReadCount() { - return readCount.get(); - } - - /** - * Increase the number of container read count by 1. - */ - public void incrReadCount() { - this.readCount.incrementAndGet(); - } - - /** - * Get the number of bytes write into the container. - * @return the number of bytes write into the container. - */ - public long getWriteBytes() { - return writeBytes.get(); - } - - /** - * Increase the number of bytes write into the container. - * @param bytes the number of bytes write into the container. - */ - public void incrWriteBytes(long bytes) { - this.writeBytes.addAndGet(bytes); - } - - /** - * Get the number of writes into the container. - * @return the number of writes into the container. - */ - public long getWriteCount() { - return writeCount.get(); - } - - /** - * Increase the number of writes into the container by 1. - */ - public void incrWriteCount() { - this.writeCount.incrementAndGet(); - } - - /** - * Get the number of bytes used by the container. - * @return the number of bytes used by the container. - */ - public long getBytesUsed() { - return containerData.getBytesUsed(); - } - - /** - * Increase the number of bytes used by the container. - * @param used number of bytes used by the container. - * @return the current number of bytes used by the container afert increase. - */ - public long incrBytesUsed(long used) { - return containerData.addBytesUsed(used); - } - - /** - * Set the number of bytes used by the container. - * @param used the number of bytes used by the container. - */ - public void setBytesUsed(long used) { - containerData.setBytesUsed(used); - } - - /** - * Decrease the number of bytes used by the container. - * @param reclaimed the number of bytes reclaimed from the container. - * @return the current number of bytes used by the container after decrease. - */ - public long decrBytesUsed(long reclaimed) { - return this.containerData.addBytesUsed(-1L * reclaimed); - } - - /** - * Get the maximum container size. - * @return the maximum container size. - */ - public long getMaxSize() { - return containerData.getMaxSize(); - } - - /** - * Set the maximum container size. - * @param size the maximum container size. - */ - public void setMaxSize(long size) { - this.containerData.setMaxSize(size); - } - - /** - * Get the number of keys in the container. - * @return the number of keys in the container. - */ - public long getNumKeys() { - return containerData.getKeyCount(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java deleted file mode 100644 index 07a3a53..0000000 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.common.impl; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CachingGetSpaceUsed; -import org.apache.hadoop.fs.DF; -import org.apache.hadoop.fs.GetSpaceUsed; -import org.apache.hadoop.hdfs.server.datanode.StorageLocation; -import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.util.ShutdownHookManager; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.io.FileNotFoundException; -import java.io.OutputStreamWriter; -import java.io.FileOutputStream; -import java.net.URI; -import java.nio.charset.StandardCharsets; -import java.nio.file.Paths; -import java.util.Scanner; - -import static org.apache.hadoop.util.RunJar.SHUTDOWN_HOOK_PRIORITY; - -/** - * Class that wraps the space usage of the Datanode Container Storage Location - * by SCM containers. - */ -public class ContainerStorageLocation { - private static final Logger LOG = - LoggerFactory.getLogger(ContainerStorageLocation.class); - - private static final String DU_CACHE_FILE = "scmUsed"; - private volatile boolean scmUsedSaved = false; - - private final StorageLocation dataLocation; - private final String storageUuId; - private final DF usage; - private final GetSpaceUsed scmUsage; - private final File scmUsedFile; - - public ContainerStorageLocation(StorageLocation dataLoc, Configuration conf) - throws IOException { - this.dataLocation = dataLoc; - this.storageUuId = DatanodeStorage.generateUuid(); - File dataDir = Paths.get(dataLoc.getNormalizedUri()).resolve( - OzoneConsts.CONTAINER_PREFIX).toFile(); - // Initialize container data root if it does not exist as required by DF/DU - if (!dataDir.exists()) { - if (!dataDir.mkdirs()) { - LOG.error("Unable to create the container storage location at : {}", - dataDir); - throw new IllegalArgumentException("Unable to create the container" + - " storage location at : " + dataDir); - } - } - scmUsedFile = new File(dataDir, DU_CACHE_FILE); - // get overall disk usage - this.usage = new DF(dataDir, conf); - // get SCM specific usage - this.scmUsage = new CachingGetSpaceUsed.Builder().setPath(dataDir) - .setConf(conf) - .setInitialUsed(loadScmUsed()) - .build(); - - // Ensure scm usage is saved during shutdown. - ShutdownHookManager.get().addShutdownHook( - new Runnable() { - @Override - public void run() { - if (!scmUsedSaved) { - saveScmUsed(); - } - } - }, SHUTDOWN_HOOK_PRIORITY); - } - - public URI getNormalizedUri() { - return dataLocation.getNormalizedUri(); - } - - public String getStorageUuId() { - return storageUuId; - } - public long getCapacity() { - long capacity = usage.getCapacity(); - return (capacity > 0) ? capacity : 0; - } - - public long getAvailable() throws IOException { - long remaining = getCapacity() - getScmUsed(); - long available = usage.getAvailable(); - if (remaining > available) { - remaining = available; - } - return (remaining > 0) ? remaining : 0; - } - - public long getScmUsed() throws IOException{ - return scmUsage.getUsed(); - } - - public void shutdown() { - saveScmUsed(); - scmUsedSaved = true; - - if (scmUsage instanceof CachingGetSpaceUsed) { - IOUtils.cleanupWithLogger(null, ((CachingGetSpaceUsed) scmUsage)); - } - } - - /** - * Read in the cached DU value and return it if it is less than 600 seconds - * old (DU update interval). Slight imprecision of scmUsed is not critical - * and skipping DU can significantly shorten the startup time. - * If the cached value is not available or too old, -1 is returned. - */ - long loadScmUsed() { - long cachedScmUsed; - long mtime; - Scanner sc; - - try { - sc = new Scanner(scmUsedFile, "UTF-8"); - } catch (FileNotFoundException fnfe) { - return -1; - } - - try { - // Get the recorded scmUsed from the file. - if (sc.hasNextLong()) { - cachedScmUsed = sc.nextLong(); - } else { - return -1; - } - // Get the recorded mtime from the file. - if (sc.hasNextLong()) { - mtime = sc.nextLong(); - } else { - return -1; - } - - // Return the cached value if mtime is okay. - if (mtime > 0 && (Time.now() - mtime < 600000L)) { - LOG.info("Cached ScmUsed found for {} : {} ", dataLocation, - cachedScmUsed); - return cachedScmUsed; - } - return -1; - } finally { - sc.close(); - } - } - - /** - * Write the current scmUsed to the cache file. - */ - void saveScmUsed() { - if (scmUsedFile.exists() && !scmUsedFile.delete()) { - LOG.warn("Failed to delete old scmUsed file in {}.", dataLocation); - } - OutputStreamWriter out = null; - try { - long used = getScmUsed(); - if (used > 0) { - out = new OutputStreamWriter(new FileOutputStream(scmUsedFile), - StandardCharsets.UTF_8); - // mtime is written last, so that truncated writes won't be valid. - out.write(Long.toString(used) + " " + Long.toString(Time.now())); - out.flush(); - out.close(); - out = null; - } - } catch (IOException ioe) { - // If write failed, the volume might be bad. Since the cache file is - // not critical, log the error and continue. - LOG.warn("Failed to write scmUsed to " + scmUsedFile, ioe); - } finally { - IOUtils.cleanupWithLogger(null, out); - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org