http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java new file mode 100644 index 0000000..33eb911 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.helpers; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.container.common.helpers + .StorageContainerException; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos; +import org.apache.hadoop.ozone.container.common.utils.ContainerCache; +import org.apache.hadoop.utils.MetadataStore; + +import java.io.IOException; +import java.nio.charset.Charset; + +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .NO_SUCH_KEY; +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .UNABLE_TO_READ_METADATA_DB; + +/** + * Utils functions to help key functions. + */ +public final class KeyUtils { + public static final String ENCODING_NAME = "UTF-8"; + public static final Charset ENCODING = Charset.forName(ENCODING_NAME); + + /** + * Never Constructed. + */ + private KeyUtils() { + } + + /** + * Get a DB handler for a given container. + * If the handler doesn't exist in cache yet, first create one and + * add into cache. This function is called with containerManager + * ReadLock held. + * + * @param container container. + * @param conf configuration. + * @return MetadataStore handle. + * @throws StorageContainerException + */ + public static MetadataStore getDB(ContainerData container, + Configuration conf) throws StorageContainerException { + Preconditions.checkNotNull(container); + ContainerCache cache = ContainerCache.getInstance(conf); + Preconditions.checkNotNull(cache); + try { + return cache.getDB(container.getContainerName(), container.getDBPath()); + } catch (IOException ex) { + String message = + String.format("Unable to open DB. DB Name: %s, Path: %s. ex: %s", + container.getContainerName(), container.getDBPath(), ex.getMessage()); + throw new StorageContainerException(message, UNABLE_TO_READ_METADATA_DB); + } + } + + /** + * Remove a DB handler from cache. + * + * @param container - Container data. + * @param conf - Configuration. + */ + public static void removeDB(ContainerData container, + Configuration conf) { + Preconditions.checkNotNull(container); + ContainerCache cache = ContainerCache.getInstance(conf); + Preconditions.checkNotNull(cache); + cache.removeDB(container.getContainerName()); + } + /** + * Shutdown all DB Handles. + * + * @param cache - Cache for DB Handles. + */ + @SuppressWarnings("unchecked") + public static void shutdownCache(ContainerCache cache) { + cache.shutdownCache(); + } + + /** + * Returns successful keyResponse. + * @param msg - Request. + * @return Response. + */ + public static ContainerProtos.ContainerCommandResponseProto + getKeyResponse(ContainerProtos.ContainerCommandRequestProto msg) { + return ContainerUtils.getContainerResponse(msg); + } + + + public static ContainerProtos.ContainerCommandResponseProto + getKeyDataResponse(ContainerProtos.ContainerCommandRequestProto msg, + KeyData data) { + ContainerProtos.GetKeyResponseProto.Builder getKey = ContainerProtos + .GetKeyResponseProto.newBuilder(); + getKey.setKeyData(data.getProtoBufMessage()); + ContainerProtos.ContainerCommandResponseProto.Builder builder = + ContainerUtils.getContainerResponse(msg, ContainerProtos.Result + .SUCCESS, ""); + builder.setGetKey(getKey); + return builder.build(); + } + + /** + * Parses the key name from a bytes array. + * @param bytes key name in bytes. + * @return key name string. + */ + public static String getKeyName(byte[] bytes) { + return new String(bytes, ENCODING); + } + + /** + * Parses the {@link KeyData} from a bytes array. + * + * @param bytes key data in bytes. + * @return key data. + * @throws IOException if the bytes array is malformed or invalid. + */ + public static KeyData getKeyData(byte[] bytes) throws IOException { + try { + ContainerProtos.KeyData kd = ContainerProtos.KeyData.parseFrom(bytes); + KeyData data = KeyData.getFromProtoBuf(kd); + return data; + } catch (IOException e) { + throw new StorageContainerException("Failed to parse key data from the" + + " bytes array.", NO_SUCH_KEY); + } + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java new file mode 100644 index 0000000..21f31e1 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.helpers; +/** + Contains protocol buffer helper classes and utilites used in + impl. + **/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java new file mode 100644 index 0000000..b0286b9 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.impl; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers + .StorageContainerException; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils; +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.security.NoSuchAlgorithmException; +import java.util.concurrent.ExecutionException; + +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .CONTAINER_INTERNAL_ERROR; +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .UNSUPPORTED_REQUEST; + +/** + * An implementation of ChunkManager that is used by default in ozone. + */ +public class ChunkManagerImpl implements ChunkManager { + static final Logger LOG = + LoggerFactory.getLogger(ChunkManagerImpl.class); + + private final ContainerManager containerManager; + + /** + * Constructs a ChunkManager. + * + * @param manager - ContainerManager. + */ + public ChunkManagerImpl(ContainerManager manager) { + this.containerManager = manager; + } + + /** + * writes a given chunk. + * + * @param pipeline - Name and the set of machines that make this container. + * @param keyName - Name of the Key. + * @param info - ChunkInfo. + * @throws StorageContainerException + */ + @Override + public void writeChunk(Pipeline pipeline, String keyName, ChunkInfo info, + byte[] data, ContainerProtos.Stage stage) + throws StorageContainerException { + // we don't want container manager to go away while we are writing chunks. + containerManager.readLock(); + + // TODO : Take keyManager Write lock here. + try { + Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); + String containerName = pipeline.getContainerName(); + Preconditions.checkNotNull(containerName, + "Container name cannot be null"); + ContainerData container = + containerManager.readContainer(containerName); + File chunkFile = ChunkUtils.validateChunk(pipeline, container, info); + File tmpChunkFile = getTmpChunkFile(chunkFile, info); + + LOG.debug("writing chunk:{} chunk stage:{} chunk file:{} tmp chunk file", + info.getChunkName(), stage, chunkFile, tmpChunkFile); + switch (stage) { + case WRITE_DATA: + ChunkUtils.writeData(tmpChunkFile, info, data); + break; + case COMMIT_DATA: + commitChunk(tmpChunkFile, chunkFile, containerName, info.getLen()); + break; + case COMBINED: + // directly write to the chunk file + long oldSize = chunkFile.length(); + ChunkUtils.writeData(chunkFile, info, data); + long newSize = chunkFile.length(); + containerManager.incrBytesUsed(containerName, newSize - oldSize); + containerManager.incrWriteCount(containerName); + containerManager.incrWriteBytes(containerName, info.getLen()); + break; + } + } catch (ExecutionException | NoSuchAlgorithmException | IOException e) { + LOG.error("write data failed. error: {}", e); + throw new StorageContainerException("Internal error: ", e, + CONTAINER_INTERNAL_ERROR); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("write data failed. error: {}", e); + throw new StorageContainerException("Internal error: ", e, + CONTAINER_INTERNAL_ERROR); + } finally { + containerManager.readUnlock(); + } + } + + // Create a temporary file in the same container directory + // in the format "<chunkname>.tmp" + private static File getTmpChunkFile(File chunkFile, ChunkInfo info) + throws StorageContainerException { + return new File(chunkFile.getParent(), + chunkFile.getName() + + OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER + + OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX); + } + + // Commit the chunk by renaming the temporary chunk file to chunk file + private void commitChunk(File tmpChunkFile, File chunkFile, + String containerName, long chunkLen) throws IOException { + long sizeDiff = tmpChunkFile.length() - chunkFile.length(); + // It is safe to replace here as the earlier chunk if existing should be + // caught as part of validateChunk + Files.move(tmpChunkFile.toPath(), chunkFile.toPath(), + StandardCopyOption.REPLACE_EXISTING); + containerManager.incrBytesUsed(containerName, sizeDiff); + containerManager.incrWriteCount(containerName); + containerManager.incrWriteBytes(containerName, chunkLen); + } + + /** + * reads the data defined by a chunk. + * + * @param pipeline - container pipeline. + * @param keyName - Name of the Key + * @param info - ChunkInfo. + * @return byte array + * @throws StorageContainerException + * TODO: Right now we do not support partial reads and writes of chunks. + * TODO: Explore if we need to do that for ozone. + */ + @Override + public byte[] readChunk(Pipeline pipeline, String keyName, ChunkInfo info) + throws StorageContainerException { + containerManager.readLock(); + try { + Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); + String containerName = pipeline.getContainerName(); + Preconditions.checkNotNull(containerName, + "Container name cannot be null"); + ContainerData container = + containerManager.readContainer(containerName); + File chunkFile = ChunkUtils.getChunkFile(pipeline, container, info); + ByteBuffer data = ChunkUtils.readData(chunkFile, info); + containerManager.incrReadCount(containerName); + containerManager.incrReadBytes(containerName, chunkFile.length()); + return data.array(); + } catch (ExecutionException | NoSuchAlgorithmException e) { + LOG.error("read data failed. error: {}", e); + throw new StorageContainerException("Internal error: ", + e, CONTAINER_INTERNAL_ERROR); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("read data failed. error: {}", e); + throw new StorageContainerException("Internal error: ", + e, CONTAINER_INTERNAL_ERROR); + } finally { + containerManager.readUnlock(); + } + } + + /** + * Deletes a given chunk. + * + * @param pipeline - Pipeline. + * @param keyName - Key Name + * @param info - Chunk Info + * @throws StorageContainerException + */ + @Override + public void deleteChunk(Pipeline pipeline, String keyName, ChunkInfo info) + throws StorageContainerException { + containerManager.readLock(); + try { + Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); + String containerName = pipeline.getContainerName(); + Preconditions.checkNotNull(containerName, + "Container name cannot be null"); + File chunkFile = ChunkUtils.getChunkFile(pipeline, containerManager + .readContainer(containerName), info); + if ((info.getOffset() == 0) && (info.getLen() == chunkFile.length())) { + FileUtil.fullyDelete(chunkFile); + containerManager.decrBytesUsed(containerName, chunkFile.length()); + } else { + LOG.error("Not Supported Operation. Trying to delete a " + + "chunk that is in shared file. chunk info : " + info.toString()); + throw new StorageContainerException("Not Supported Operation. " + + "Trying to delete a chunk that is in shared file. chunk info : " + + info.toString(), UNSUPPORTED_REQUEST); + } + } finally { + containerManager.readUnlock(); + } + } + + /** + * Shutdown the chunkManager. + * + * In the chunkManager we haven't acquired any resources, so nothing to do + * here. This call is made with containerManager Writelock held. + */ + @Override + public void shutdown() { + Preconditions.checkState(this.containerManager.hasWriteLock()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerLocationManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerLocationManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerLocationManagerImpl.java new file mode 100644 index 0000000..e0e826c --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerLocationManagerImpl.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.impl; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.interfaces + .ContainerLocationManager; +import org.apache.hadoop.ozone.container.common.interfaces + .ContainerLocationManagerMXBean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.management.ObjectName; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.LinkedList; +import java.util.List; + +/** + * A class that tells the ContainerManager where to place the containers. + * Please note : There is *no* one-to-one correlation between metadata + * Locations and data Locations. + * + * For example : A user could map all container files to a + * SSD but leave data/metadata on bunch of other disks. + */ +public class ContainerLocationManagerImpl implements ContainerLocationManager, + ContainerLocationManagerMXBean { + private static final Logger LOG = + LoggerFactory.getLogger(ContainerLocationManagerImpl.class); + + private final List<ContainerStorageLocation> dataLocations; + private int currentIndex; + private final List<StorageLocation> metadataLocations; + private final ObjectName jmxbean; + + /** + * Constructs a Location Manager. + * @param metadataLocations - Refers to the metadataLocations + * where we store the container metadata. + * @param dataDirs - metadataLocations where we store the actual + * data or chunk files. + * @param conf - configuration. + * @throws IOException + */ + public ContainerLocationManagerImpl(List<StorageLocation> metadataLocations, + List<StorageLocation> dataDirs, Configuration conf) + throws IOException { + dataLocations = new LinkedList<>(); + for (StorageLocation dataDir : dataDirs) { + dataLocations.add(new ContainerStorageLocation(dataDir, conf)); + } + this.metadataLocations = metadataLocations; + jmxbean = MBeans.register("OzoneDataNode", + ContainerLocationManager.class.getSimpleName(), this); + } + + /** + * Returns the path where the container should be placed from a set of + * metadataLocations. + * + * @return A path where we should place this container and metadata. + * @throws IOException + */ + @Override + public Path getContainerPath() + throws IOException { + Preconditions.checkState(metadataLocations.size() > 0); + int index = currentIndex % metadataLocations.size(); + return Paths.get(metadataLocations.get(index).getNormalizedUri()); + } + + /** + * Returns the path where the container Data file are stored. + * + * @return a path where we place the LevelDB and data files of a container. + * @throws IOException + */ + @Override + public Path getDataPath(String containerName) throws IOException { + Path currentPath = Paths.get( + dataLocations.get(currentIndex++ % dataLocations.size()) + .getNormalizedUri()); + currentPath = currentPath.resolve(OzoneConsts.CONTAINER_PREFIX); + return currentPath.resolve(containerName); + } + + @Override + public StorageLocationReport[] getLocationReport() throws IOException { + StorageLocationReport[] reports = + new StorageLocationReport[dataLocations.size()]; + for (int idx = 0; idx < dataLocations.size(); idx++) { + ContainerStorageLocation loc = dataLocations.get(idx); + long scmUsed = 0; + long remaining = 0; + try { + scmUsed = loc.getScmUsed(); + remaining = loc.getAvailable(); + } catch (IOException ex) { + LOG.warn("Failed to get scmUsed and remaining for container " + + "storage location {}", loc.getNormalizedUri()); + // reset scmUsed and remaining if df/du failed. + scmUsed = 0; + remaining = 0; + } + + // TODO: handle failed storage + // For now, include storage report for location that failed to get df/du. + StorageLocationReport r = new StorageLocationReport( + loc.getStorageUuId(), false, loc.getCapacity(), + scmUsed, remaining); + reports[idx] = r; + } + return reports; + } + + /** + * Supports clean shutdown of container location du threads. + * + * @throws IOException + */ + @Override + public void shutdown() throws IOException { + for (ContainerStorageLocation loc: dataLocations) { + loc.shutdown(); + } + MBeans.unregister(jmxbean); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java new file mode 100644 index 0000000..5e7375c --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java @@ -0,0 +1,1113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.impl; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers + .StorageContainerException; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReportState; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMStorageReport; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; +import org.apache.hadoop.ozone.container.common.helpers.KeyData; +import org.apache.hadoop.ozone.container.common.helpers.KeyUtils; +import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager; +import org.apache.hadoop.ozone.container.common.interfaces + .ContainerDeletionChoosingPolicy; +import org.apache.hadoop.ozone.container.common.interfaces + .ContainerLocationManager; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; +import org.apache.hadoop.ozone.container.common.interfaces + .ContainerReportManager; +import org.apache.hadoop.ozone.container.common.interfaces.KeyManager; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.utils.MetadataKeyFilters; +import org.apache.hadoop.utils.MetadataStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FilenameFilter; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.DigestInputStream; +import java.security.DigestOutputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .CONTAINER_EXISTS; +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .CONTAINER_INTERNAL_ERROR; +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .CONTAINER_NOT_FOUND; +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .ERROR_IN_COMPACT_DB; +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .INVALID_CONFIG; +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .IO_EXCEPTION; +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .NO_SUCH_ALGORITHM; +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .UNABLE_TO_READ_METADATA_DB; +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .UNCLOSED_CONTAINER_IO; +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .UNSUPPORTED_REQUEST; +import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_EXTENSION; +import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_META; + +/** + * A Generic ContainerManagerImpl that will be called from Ozone + * ContainerManagerImpl. This allows us to support delta changes to ozone + * version without having to rewrite the containerManager. + */ +public class ContainerManagerImpl implements ContainerManager { + static final Logger LOG = + LoggerFactory.getLogger(ContainerManagerImpl.class); + + private final ConcurrentSkipListMap<String, ContainerStatus> + containerMap = new ConcurrentSkipListMap<>(); + + // Use a non-fair RW lock for better throughput, we may revisit this decision + // if this causes fairness issues. + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private ContainerLocationManager locationManager; + private ChunkManager chunkManager; + private KeyManager keyManager; + private Configuration conf; + private DatanodeDetails datanodeDetails; + + private ContainerDeletionChoosingPolicy containerDeletionChooser; + private ContainerReportManager containerReportManager; + + /** + * Init call that sets up a container Manager. + * + * @param config - Configuration. + * @param containerDirs - List of Metadata Container locations. + * @param dnDetails - DatanodeDetails. + * @throws IOException + */ + @Override + public void init( + Configuration config, List<StorageLocation> containerDirs, + DatanodeDetails dnDetails) throws IOException { + Preconditions.checkNotNull(config, "Config must not be null"); + Preconditions.checkNotNull(containerDirs, "Container directories cannot " + + "be null"); + Preconditions.checkNotNull(dnDetails, "Datanode Details cannot " + + "be null"); + + Preconditions.checkState(containerDirs.size() > 0, "Number of container" + + " directories must be greater than zero."); + + this.conf = config; + this.datanodeDetails = dnDetails; + + readLock(); + try { + containerDeletionChooser = ReflectionUtils.newInstance(conf.getClass( + ScmConfigKeys.OZONE_SCM_CONTAINER_DELETION_CHOOSING_POLICY, + TopNOrderedContainerDeletionChoosingPolicy.class, + ContainerDeletionChoosingPolicy.class), conf); + + for (StorageLocation path : containerDirs) { + File directory = Paths.get(path.getNormalizedUri()).toFile(); + if (!directory.exists() && !directory.mkdirs()) { + LOG.error("Container metadata directory doesn't exist " + + "and cannot be created. Path: {}", path.toString()); + throw new StorageContainerException("Container metadata " + + "directory doesn't exist and cannot be created " + path + .toString(), INVALID_CONFIG); + } + + // TODO: This will fail if any directory is invalid. + // We should fix this to handle invalid directories and continue. + // Leaving it this way to fail fast for time being. + if (!directory.isDirectory()) { + LOG.error("Invalid path to container metadata directory. path: {}", + path.toString()); + throw new StorageContainerException("Invalid path to container " + + "metadata directory." + path, INVALID_CONFIG); + } + LOG.info("Loading containers under {}", path); + File[] files = directory.listFiles(new ContainerFilter()); + if (files != null) { + for (File containerFile : files) { + LOG.debug("Loading container {}", containerFile); + String containerPath = + ContainerUtils.getContainerNameFromFile(containerFile); + Preconditions.checkNotNull(containerPath, "Container path cannot" + + " be null"); + readContainerInfo(containerPath); + } + } + } + + List<StorageLocation> dataDirs = new LinkedList<>(); + for (String dir : config.getStrings(DFS_DATANODE_DATA_DIR_KEY)) { + StorageLocation location = StorageLocation.parse(dir); + dataDirs.add(location); + } + this.locationManager = + new ContainerLocationManagerImpl(containerDirs, dataDirs, config); + + this.containerReportManager = + new ContainerReportManagerImpl(config); + } finally { + readUnlock(); + } + } + + /** + * Reads the Container Info from a file and verifies that checksum match. If + * the checksums match, then that file is added to containerMap. + * + * @param containerName - Name which points to the persisted container. + * @throws StorageContainerException + */ + private void readContainerInfo(String containerName) + throws StorageContainerException { + Preconditions.checkState(containerName.length() > 0, + "Container name length cannot be zero."); + FileInputStream containerStream = null; + DigestInputStream dis = null; + FileInputStream metaStream = null; + Path cPath = Paths.get(containerName).getFileName(); + String keyName = null; + if (cPath != null) { + keyName = cPath.toString(); + } + Preconditions.checkNotNull(keyName, + "Container Name to container key mapping is null"); + + try { + String containerFileName = containerName.concat(CONTAINER_EXTENSION); + String metaFileName = containerName.concat(CONTAINER_META); + + containerStream = new FileInputStream(containerFileName); + + metaStream = new FileInputStream(metaFileName); + + MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); + + dis = new DigestInputStream(containerStream, sha); + + ContainerProtos.ContainerData containerDataProto = + ContainerProtos.ContainerData.parseDelimitedFrom(dis); + ContainerData containerData; + if (containerDataProto == null) { + // Sometimes container metadata might have been created but empty, + // when loading the info we get a null, this often means last time + // SCM was ending up at some middle phase causing that the metadata + // was not populated. Such containers are marked as inactive. + containerMap.put(keyName, new ContainerStatus(null)); + return; + } + containerData = ContainerData.getFromProtBuf(containerDataProto, conf); + ContainerProtos.ContainerMeta meta = + ContainerProtos.ContainerMeta.parseDelimitedFrom(metaStream); + if (meta != null && !DigestUtils.sha256Hex(sha.digest()) + .equals(meta.getHash())) { + // This means we were not able read data from the disk when booted the + // datanode. We are going to rely on SCM understanding that we don't + // have valid data for this container when we send container reports. + // Hopefully SCM will ask us to delete this container and rebuild it. + LOG.error("Invalid SHA found for container data. Name :{}" + + "cowardly refusing to read invalid data", containerName); + containerMap.put(keyName, new ContainerStatus(null)); + return; + } + + ContainerStatus containerStatus = new ContainerStatus(containerData); + // Initialize pending deletion blocks count in in-memory + // container status. + MetadataStore metadata = KeyUtils.getDB(containerData, conf); + List<Map.Entry<byte[], byte[]>> underDeletionBlocks = metadata + .getSequentialRangeKVs(null, Integer.MAX_VALUE, + MetadataKeyFilters.getDeletingKeyFilter()); + containerStatus.incrPendingDeletionBlocks(underDeletionBlocks.size()); + + List<Map.Entry<byte[], byte[]>> liveKeys = metadata + .getRangeKVs(null, Integer.MAX_VALUE, + MetadataKeyFilters.getNormalKeyFilter()); + + // Get container bytesUsed upon loading container + // The in-memory state is updated upon key write or delete + // TODO: update containerDataProto and persist it into container MetaFile + long bytesUsed = 0; + bytesUsed = liveKeys.parallelStream().mapToLong(e-> { + KeyData keyData; + try { + keyData = KeyUtils.getKeyData(e.getValue()); + return keyData.getSize(); + } catch (IOException ex) { + return 0L; + } + }).sum(); + containerStatus.setBytesUsed(bytesUsed); + + containerMap.put(keyName, containerStatus); + } catch (IOException | NoSuchAlgorithmException ex) { + LOG.error("read failed for file: {} ex: {}", containerName, + ex.getMessage()); + + // TODO : Add this file to a recovery Queue. + + // Remember that this container is busted and we cannot use it. + containerMap.put(keyName, new ContainerStatus(null)); + throw new StorageContainerException("Unable to read container info", + UNABLE_TO_READ_METADATA_DB); + } finally { + IOUtils.closeStream(dis); + IOUtils.closeStream(containerStream); + IOUtils.closeStream(metaStream); + } + } + + /** + * Creates a container with the given name. + * + * @param pipeline -- Nodes which make up this container. + * @param containerData - Container Name and metadata. + * @throws StorageContainerException - Exception + */ + @Override + public void createContainer(Pipeline pipeline, ContainerData containerData) + throws StorageContainerException { + Preconditions.checkNotNull(containerData, "Container data cannot be null"); + writeLock(); + try { + if (containerMap.containsKey(containerData.getName())) { + LOG.debug("container already exists. {}", containerData.getName()); + throw new StorageContainerException("container already exists.", + CONTAINER_EXISTS); + } + + // This is by design. We first write and close the + // container Info and metadata to a directory. + // Then read back and put that info into the containerMap. + // This allows us to make sure that our write is consistent. + + writeContainerInfo(containerData, false); + File cFile = new File(containerData.getContainerPath()); + readContainerInfo(ContainerUtils.getContainerNameFromFile(cFile)); + } catch (NoSuchAlgorithmException ex) { + LOG.error("Internal error: We seem to be running a JVM without a " + + "needed hash algorithm."); + throw new StorageContainerException("failed to create container", + NO_SUCH_ALGORITHM); + } finally { + writeUnlock(); + } + + } + + /** + * Writes a container to a chosen location and updates the container Map. + * + * The file formats of ContainerData and Container Meta is the following. + * + * message ContainerData { + * required string name = 1; + * repeated KeyValue metadata = 2; + * optional string dbPath = 3; + * optional string containerPath = 4; + * optional int64 bytesUsed = 5; + * optional int64 size = 6; + * } + * + * message ContainerMeta { + * required string fileName = 1; + * required string hash = 2; + * } + * + * @param containerData - container Data + * @param overwrite - Whether we are overwriting. + * @throws StorageContainerException, NoSuchAlgorithmException + */ + private void writeContainerInfo(ContainerData containerData, + boolean overwrite) + throws StorageContainerException, NoSuchAlgorithmException { + + Preconditions.checkNotNull(this.locationManager, + "Internal error: location manager cannot be null"); + + FileOutputStream containerStream = null; + DigestOutputStream dos = null; + FileOutputStream metaStream = null; + + try { + Path metadataPath = null; + Path location = (!overwrite) ? locationManager.getContainerPath(): + Paths.get(containerData.getContainerPath()).getParent(); + if (location == null) { + throw new StorageContainerException( + "Failed to get container file path.", + CONTAINER_INTERNAL_ERROR); + } + + File containerFile = ContainerUtils.getContainerFile(containerData, + location); + File metadataFile = ContainerUtils.getMetadataFile(containerData, + location); + String containerName = containerData.getContainerName(); + + if(!overwrite) { + ContainerUtils.verifyIsNewContainer(containerFile, metadataFile); + metadataPath = this.locationManager.getDataPath(containerName); + metadataPath = ContainerUtils.createMetadata(metadataPath, + containerName, conf); + } else { + metadataPath = ContainerUtils.getMetadataDirectory(containerData); + } + + containerStream = new FileOutputStream(containerFile); + metaStream = new FileOutputStream(metadataFile); + MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); + + dos = new DigestOutputStream(containerStream, sha); + containerData.setDBPath(metadataPath.resolve( + ContainerUtils.getContainerDbFileName(containerName)) + .toString()); + containerData.setContainerPath(containerFile.toString()); + + ContainerProtos.ContainerData protoData = containerData + .getProtoBufMessage(); + protoData.writeDelimitedTo(dos); + + ContainerProtos.ContainerMeta protoMeta = ContainerProtos + .ContainerMeta.newBuilder() + .setFileName(containerFile.toString()) + .setHash(DigestUtils.sha256Hex(sha.digest())) + .build(); + protoMeta.writeDelimitedTo(metaStream); + + } catch (IOException ex) { + // TODO : we need to clean up partially constructed files + // The proper way to do would be for a thread + // to read all these 3 artifacts and make sure they are + // sane. That info needs to come from the replication + // pipeline, and if not consistent delete these file. + + // In case of ozone this is *not* a deal breaker since + // SCM is guaranteed to generate unique container names. + // The saving grace is that we check if we have residue files + // lying around when creating a new container. We need to queue + // this information to a cleaner thread. + + LOG.error("Creation of container failed. Name: {}, we might need to " + + "cleanup partially created artifacts. ", + containerData.getContainerName(), ex); + throw new StorageContainerException("Container creation failed. ", + ex, CONTAINER_INTERNAL_ERROR); + } finally { + IOUtils.closeStream(dos); + IOUtils.closeStream(containerStream); + IOUtils.closeStream(metaStream); + } + } + + /** + * Deletes an existing container. + * + * @param pipeline - nodes that make this container. + * @param containerName - name of the container. + * @param forceDelete - whether this container should be deleted forcibly. + * @throws StorageContainerException + */ + @Override + public void deleteContainer(Pipeline pipeline, String containerName, + boolean forceDelete) throws StorageContainerException { + Preconditions.checkNotNull(containerName, "Container name cannot be null"); + Preconditions.checkState(containerName.length() > 0, + "Container name length cannot be zero."); + writeLock(); + try { + if (isOpen(pipeline.getContainerName())) { + throw new StorageContainerException( + "Deleting an open container is not allowed.", + UNCLOSED_CONTAINER_IO); + } + + ContainerStatus status = containerMap.get(containerName); + if (status == null) { + LOG.debug("No such container. Name: {}", containerName); + throw new StorageContainerException("No such container. Name : " + + containerName, CONTAINER_NOT_FOUND); + } + if (status.getContainer() == null) { + LOG.debug("Invalid container data. Name: {}", containerName); + throw new StorageContainerException("Invalid container data. Name : " + + containerName, CONTAINER_NOT_FOUND); + } + ContainerUtils.removeContainer(status.getContainer(), conf, forceDelete); + containerMap.remove(containerName); + } catch (StorageContainerException e) { + throw e; + } catch (IOException e) { + // TODO : An I/O error during delete can leave partial artifacts on the + // disk. We will need the cleaner thread to cleanup this information. + LOG.error("Failed to cleanup container. Name: {}", containerName, e); + throw new StorageContainerException(containerName, e, IO_EXCEPTION); + } finally { + writeUnlock(); + } + } + + /** + * A simple interface for container Iterations. + * <p/> + * This call make no guarantees about consistency of the data between + * different list calls. It just returns the best known data at that point of + * time. It is possible that using this iteration you can miss certain + * container from the listing. + * + * @param prefix - Return keys that match this prefix. + * @param count - how many to return + * @param prevKey - Previous Key Value or empty String. + * @param data - Actual containerData + * @throws StorageContainerException + */ + @Override + public void listContainer(String prefix, long count, String prevKey, + List<ContainerData> data) throws StorageContainerException { + // TODO : Support list with Prefix and PrevKey + Preconditions.checkNotNull(data, + "Internal assertion: data cannot be null"); + readLock(); + try { + ConcurrentNavigableMap<String, ContainerStatus> map; + if (prevKey == null || prevKey.isEmpty()) { + map = containerMap.tailMap(containerMap.firstKey(), true); + } else { + map = containerMap.tailMap(prevKey, false); + } + + int currentCount = 0; + for (ContainerStatus entry : map.values()) { + if (currentCount < count) { + data.add(entry.getContainer()); + currentCount++; + } else { + return; + } + } + } finally { + readUnlock(); + } + } + + /** + * Get metadata about a specific container. + * + * @param containerName - Name of the container + * @return ContainerData - Container Data. + * @throws StorageContainerException + */ + @Override + public ContainerData readContainer(String containerName) throws + StorageContainerException { + Preconditions.checkNotNull(containerName, "Container name cannot be null"); + Preconditions.checkState(containerName.length() > 0, + "Container name length cannot be zero."); + if (!containerMap.containsKey(containerName)) { + throw new StorageContainerException("Unable to find the container. Name: " + + containerName, CONTAINER_NOT_FOUND); + } + ContainerData cData = containerMap.get(containerName).getContainer(); + if (cData == null) { + throw new StorageContainerException("Invalid container data. Name: " + + containerName, CONTAINER_INTERNAL_ERROR); + } + return cData; + } + + /** + * Closes a open container, if it is already closed or does not exist a + * StorageContainerException is thrown. + * + * @param containerName - Name of the container. + * @throws StorageContainerException + */ + @Override + public void closeContainer(String containerName) + throws StorageContainerException, NoSuchAlgorithmException { + ContainerData containerData = readContainer(containerName); + containerData.closeContainer(); + writeContainerInfo(containerData, true); + MetadataStore db = KeyUtils.getDB(containerData, conf); + + // It is ok if this operation takes a bit of time. + // Close container is not expected to be instantaneous. + try { + db.compactDB(); + } catch (IOException e) { + LOG.error("Error in DB compaction while closing container", e); + throw new StorageContainerException(e, ERROR_IN_COMPACT_DB); + } + + // Active is different from closed. Closed means it is immutable, active + // false means we have some internal error that is happening to this + // container. This is a way to track damaged containers if we have an + // I/O failure, this allows us to take quick action in case of container + // issues. + + ContainerStatus status = new ContainerStatus(containerData); + containerMap.put(containerName, status); + } + + @Override + public void updateContainer(Pipeline pipeline, String containerName, + ContainerData data, boolean forceUpdate) + throws StorageContainerException { + Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); + Preconditions.checkNotNull(containerName, "Container name cannot be null"); + Preconditions.checkNotNull(data, "Container data cannot be null"); + FileOutputStream containerStream = null; + DigestOutputStream dos = null; + MessageDigest sha = null; + File containerFileBK = null, containerFile = null; + boolean deleted = false; + + if(!containerMap.containsKey(containerName)) { + throw new StorageContainerException("Container doesn't exist. Name :" + + containerName, CONTAINER_NOT_FOUND); + } + + try { + sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); + } catch (NoSuchAlgorithmException e) { + throw new StorageContainerException("Unable to create Message Digest," + + " usually this is a java configuration issue.", + NO_SUCH_ALGORITHM); + } + + try { + Path location = locationManager.getContainerPath(); + ContainerData orgData = containerMap.get(containerName).getContainer(); + if (orgData == null) { + // updating a invalid container + throw new StorageContainerException("Update a container with invalid" + + "container meta data", CONTAINER_INTERNAL_ERROR); + } + + if (!forceUpdate && !orgData.isOpen()) { + throw new StorageContainerException( + "Update a closed container is not allowed. Name: " + containerName, + UNSUPPORTED_REQUEST); + } + + containerFile = ContainerUtils.getContainerFile(orgData, location); + // If forceUpdate is true, there is no need to check + // whether the container file exists. + if (!forceUpdate) { + if (!containerFile.exists() || !containerFile.canWrite()) { + throw new StorageContainerException( + "Container file not exists or corrupted. Name: " + containerName, + CONTAINER_INTERNAL_ERROR); + } + + // Backup the container file + containerFileBK = File.createTempFile( + "tmp_" + System.currentTimeMillis() + "_", + containerFile.getName(), containerFile.getParentFile()); + FileUtils.copyFile(containerFile, containerFileBK); + + deleted = containerFile.delete(); + containerStream = new FileOutputStream(containerFile); + dos = new DigestOutputStream(containerStream, sha); + + ContainerProtos.ContainerData protoData = data.getProtoBufMessage(); + protoData.writeDelimitedTo(dos); + } + + // Update the in-memory map + ContainerStatus newStatus = new ContainerStatus(data); + containerMap.replace(containerName, newStatus); + } catch (IOException e) { + // Restore the container file from backup + if(containerFileBK != null && containerFileBK.exists() && deleted) { + if(containerFile.delete() + && containerFileBK.renameTo(containerFile)) { + throw new StorageContainerException("Container update failed," + + " container data restored from the backup.", + CONTAINER_INTERNAL_ERROR); + } else { + throw new StorageContainerException( + "Failed to restore container data from the backup. Name: " + + containerName, CONTAINER_INTERNAL_ERROR); + } + } else { + throw new StorageContainerException( + e.getMessage(), CONTAINER_INTERNAL_ERROR); + } + } finally { + if (containerFileBK != null && containerFileBK.exists()) { + if(!containerFileBK.delete()) { + LOG.warn("Unable to delete container file backup : {}.", + containerFileBK.getAbsolutePath()); + } + } + IOUtils.closeStream(dos); + IOUtils.closeStream(containerStream); + } + } + + @VisibleForTesting + protected File getContainerFile(ContainerData data) throws IOException { + return ContainerUtils.getContainerFile(data, + this.locationManager.getContainerPath()); + } + + /** + * Checks if a container exists. + * + * @param containerName - Name of the container. + * @return true if the container is open false otherwise. + * @throws StorageContainerException - Throws Exception if we are not able to + * find the container. + */ + @Override + public boolean isOpen(String containerName) throws StorageContainerException { + final ContainerStatus status = containerMap.get(containerName); + if (status == null) { + throw new StorageContainerException( + "Container status not found: " + containerName, CONTAINER_NOT_FOUND); + } + final ContainerData cData = status.getContainer(); + if (cData == null) { + throw new StorageContainerException( + "Container not found: " + containerName, CONTAINER_NOT_FOUND); + } + return cData.isOpen(); + } + + /** + * Supports clean shutdown of container. + * + * @throws IOException + */ + @Override + public void shutdown() throws IOException { + Preconditions.checkState(this.hasWriteLock(), + "Assumption that we are holding the lock violated."); + this.containerMap.clear(); + this.locationManager.shutdown(); + } + + + @VisibleForTesting + public ConcurrentSkipListMap<String, ContainerStatus> getContainerMap() { + return containerMap; + } + + /** + * Acquire read lock. + */ + @Override + public void readLock() { + this.lock.readLock().lock(); + + } + + @Override + public void readLockInterruptibly() throws InterruptedException { + this.lock.readLock().lockInterruptibly(); + } + + /** + * Release read lock. + */ + @Override + public void readUnlock() { + this.lock.readLock().unlock(); + } + + /** + * Check if the current thread holds read lock. + */ + @Override + public boolean hasReadLock() { + return this.lock.readLock().tryLock(); + } + + /** + * Acquire write lock. + */ + @Override + public void writeLock() { + this.lock.writeLock().lock(); + } + + /** + * Acquire write lock, unless interrupted while waiting. + */ + @Override + public void writeLockInterruptibly() throws InterruptedException { + this.lock.writeLock().lockInterruptibly(); + + } + + /** + * Release write lock. + */ + @Override + public void writeUnlock() { + this.lock.writeLock().unlock(); + + } + + /** + * Check if the current thread holds write lock. + */ + @Override + public boolean hasWriteLock() { + return this.lock.writeLock().isHeldByCurrentThread(); + } + + public ChunkManager getChunkManager() { + return this.chunkManager; + } + + /** + * Sets the chunk Manager. + * + * @param chunkManager - Chunk Manager + */ + public void setChunkManager(ChunkManager chunkManager) { + this.chunkManager = chunkManager; + } + + /** + * Gets the Key Manager. + * + * @return KeyManager. + */ + @Override + public KeyManager getKeyManager() { + return this.keyManager; + } + + /** + * Get the node report. + * @return node report. + */ + @Override + public SCMNodeReport getNodeReport() throws IOException { + StorageLocationReport[] reports = locationManager.getLocationReport(); + SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder(); + for (int i = 0; i < reports.length; i++) { + SCMStorageReport.Builder srb = SCMStorageReport.newBuilder(); + nrb.addStorageReport(i, srb.setStorageUuid(reports[i].getId()) + .setCapacity(reports[i].getCapacity()) + .setScmUsed(reports[i].getScmUsed()) + .setRemaining(reports[i].getRemaining()) + .build()); + } + return nrb.build(); + } + + + /** + * Gets container reports. + * + * @return List of all closed containers. + * @throws IOException + */ + @Override + public List<ContainerData> getContainerReports() throws IOException { + LOG.debug("Starting container report iteration."); + // No need for locking since containerMap is a ConcurrentSkipListMap + // And we can never get the exact state since close might happen + // after we iterate a point. + return containerMap.entrySet().stream() + .filter(containerStatus -> + !containerStatus.getValue().getContainer().isOpen()) + .map(containerStatus -> containerStatus.getValue().getContainer()) + .collect(Collectors.toList()); + } + + /** + * Get container report. + * + * @return The container report. + * @throws IOException + */ + @Override + public ContainerReportsRequestProto getContainerReport() throws IOException { + LOG.debug("Starting container report iteration."); + // No need for locking since containerMap is a ConcurrentSkipListMap + // And we can never get the exact state since close might happen + // after we iterate a point. + List<ContainerStatus> containers = containerMap.values().stream() + .collect(Collectors.toList()); + + ContainerReportsRequestProto.Builder crBuilder = + ContainerReportsRequestProto.newBuilder(); + + // TODO: support delta based container report + crBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage()) + .setType(ContainerReportsRequestProto.reportType.fullReport); + + for (ContainerStatus container: containers) { + StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder = + StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder(); + ciBuilder.setContainerName(container.getContainer().getContainerName()) + .setSize(container.getContainer().getMaxSize()) + .setUsed(container.getContainer().getBytesUsed()) + .setKeyCount(container.getContainer().getKeyCount()) + .setReadCount(container.getReadCount()) + .setWriteCount(container.getWriteCount()) + .setReadBytes(container.getReadBytes()) + .setWriteBytes(container.getWriteBytes()) + .setContainerID(container.getContainer().getContainerID()); + + if (container.getContainer().getHash() != null) { + ciBuilder.setFinalhash(container.getContainer().getHash()); + } + crBuilder.addReports(ciBuilder.build()); + } + + return crBuilder.build(); + } + + /** + * Sets the Key Manager. + * + * @param keyManager - Key Manager. + */ + @Override + public void setKeyManager(KeyManager keyManager) { + this.keyManager = keyManager; + } + + /** + * Filter out only container files from the container metadata dir. + */ + private static class ContainerFilter implements FilenameFilter { + /** + * Tests if a specified file should be included in a file list. + * + * @param dir the directory in which the file was found. + * @param name the name of the file. + * @return <code>true</code> if and only if the name should be included in + * the file list; <code>false</code> otherwise. + */ + @Override + public boolean accept(File dir, String name) { + return name.endsWith(CONTAINER_EXTENSION); + } + } + + @Override + public List<ContainerData> chooseContainerForBlockDeletion( + int count) throws StorageContainerException { + readLock(); + try { + return containerDeletionChooser.chooseContainerForBlockDeletion( + count, containerMap); + } finally { + readUnlock(); + } + } + + @VisibleForTesting + public ContainerDeletionChoosingPolicy getContainerDeletionChooser() { + return containerDeletionChooser; + } + + @Override + public void incrPendingDeletionBlocks(int numBlocks, String containerId) { + writeLock(); + try { + ContainerStatus status = containerMap.get(containerId); + status.incrPendingDeletionBlocks(numBlocks); + } finally { + writeUnlock(); + } + } + + @Override + public void decrPendingDeletionBlocks(int numBlocks, String containerId) { + writeLock(); + try { + ContainerStatus status = containerMap.get(containerId); + status.decrPendingDeletionBlocks(numBlocks); + } finally { + writeUnlock(); + } + } + + /** + * Increase the read count of the container. + * + * @param containerName - Name of the container. + */ + @Override + public void incrReadCount(String containerName) { + ContainerStatus status = containerMap.get(containerName); + status.incrReadCount(); + } + + public long getReadCount(String containerName) { + ContainerStatus status = containerMap.get(containerName); + return status.getReadCount(); + } + + /** + * Increse the read counter for bytes read from the container. + * + * @param containerName - Name of the container. + * @param readBytes - bytes read from the container. + */ + @Override + public void incrReadBytes(String containerName, long readBytes) { + ContainerStatus status = containerMap.get(containerName); + status.incrReadBytes(readBytes); + } + + public long getReadBytes(String containerName) { + readLock(); + try { + ContainerStatus status = containerMap.get(containerName); + return status.getReadBytes(); + } finally { + readUnlock(); + } + } + + /** + * Increase the write count of the container. + * + * @param containerName - Name of the container. + */ + @Override + public void incrWriteCount(String containerName) { + ContainerStatus status = containerMap.get(containerName); + status.incrWriteCount(); + } + + public long getWriteCount(String containerName) { + ContainerStatus status = containerMap.get(containerName); + return status.getWriteCount(); + } + + /** + * Increse the write counter for bytes write into the container. + * + * @param containerName - Name of the container. + * @param writeBytes - bytes write into the container. + */ + @Override + public void incrWriteBytes(String containerName, long writeBytes) { + ContainerStatus status = containerMap.get(containerName); + status.incrWriteBytes(writeBytes); + } + + public long getWriteBytes(String containerName) { + ContainerStatus status = containerMap.get(containerName); + return status.getWriteBytes(); + } + + /** + * Increase the bytes used by the container. + * + * @param containerName - Name of the container. + * @param used - additional bytes used by the container. + * @return the current bytes used. + */ + @Override + public long incrBytesUsed(String containerName, long used) { + ContainerStatus status = containerMap.get(containerName); + return status.incrBytesUsed(used); + } + + /** + * Decrease the bytes used by the container. + * + * @param containerName - Name of the container. + * @param used - additional bytes reclaimed by the container. + * @return the current bytes used. + */ + @Override + public long decrBytesUsed(String containerName, long used) { + ContainerStatus status = containerMap.get(containerName); + return status.decrBytesUsed(used); + } + + public long getBytesUsed(String containerName) { + ContainerStatus status = containerMap.get(containerName); + return status.getBytesUsed(); + } + + /** + * Get the number of keys in the container. + * + * @param containerName - Name of the container. + * @return the current key count. + */ + @Override + public long getNumKeys(String containerName) { + ContainerStatus status = containerMap.get(containerName); + return status.getNumKeys(); } + + /** + * Get the container report state to send via HB to SCM. + * + * @return container report state. + */ + @Override + public ReportState getContainerReportState() { + return containerReportManager.getContainerReportState(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerReportManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerReportManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerReportManagerImpl.java new file mode 100644 index 0000000..6c83c66 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerReportManagerImpl.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.impl; + +import org.apache.commons.lang3.RandomUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReportState; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.interfaces + .ContainerReportManager; +import org.apache.hadoop.util.Time; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.hadoop.hdds.scm.HddsServerUtil.getScmHeartbeatInterval; + +/** + * Class wraps the container report operations on datanode. + * // TODO: support incremental/delta container report + */ +public class ContainerReportManagerImpl implements ContainerReportManager { + // Last non-empty container report time + private long lastContainerReportTime; + private final long containerReportInterval; + private final long heartbeatInterval; + private AtomicLong reportCount; + private static final ReportState NO_CONTAINER_REPORTSTATE = + ReportState.newBuilder() + .setState(ReportState.states.noContainerReports) + .setCount(0).build(); + + public ContainerReportManagerImpl(Configuration config) { + this.lastContainerReportTime = -1; + this.reportCount = new AtomicLong(0L); + this.containerReportInterval = config.getTimeDuration( + OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL, + OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + this.heartbeatInterval = getScmHeartbeatInterval(config); + } + + public ReportState getContainerReportState() { + if (lastContainerReportTime < 0) { + return getFullContainerReportState(); + } else { + // Add a random delay (0~30s) on top of the container report + // interval (60s) so tha the SCM is overwhelmed by the container reports + // sent in sync. + if (Time.monotonicNow() - lastContainerReportTime > + (containerReportInterval + getRandomReportDelay())) { + return getFullContainerReportState(); + } else { + return getNoContainerReportState(); + } + } + } + + private ReportState getFullContainerReportState() { + ReportState.Builder rsBuilder = ReportState.newBuilder(); + rsBuilder.setState(ReportState.states.completeContinerReport); + rsBuilder.setCount(reportCount.incrementAndGet()); + this.lastContainerReportTime = Time.monotonicNow(); + return rsBuilder.build(); + } + + private ReportState getNoContainerReportState() { + return NO_CONTAINER_REPORTSTATE; + } + + private long getRandomReportDelay() { + return RandomUtils.nextLong(0, heartbeatInterval); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStatus.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStatus.java new file mode 100644 index 0000000..5577323 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStatus.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.impl; + +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * This class represents the state of a container. if the + * container reading encountered an error when we boot up we will post that + * info to a recovery queue and keep the info in the containerMap. + * <p/> + * if and when the issue is fixed, the expectation is that this entry will be + * deleted by the recovery thread from the containerMap and will insert entry + * instead of modifying this class. + */ +public class ContainerStatus { + private final ContainerData containerData; + + /** + * Number of pending deletion blocks in container. + */ + private int numPendingDeletionBlocks; + + private AtomicLong readBytes; + + private AtomicLong writeBytes; + + private AtomicLong readCount; + + private AtomicLong writeCount; + + /** + * Creates a Container Status class. + * + * @param containerData - ContainerData. + */ + ContainerStatus(ContainerData containerData) { + this.numPendingDeletionBlocks = 0; + this.containerData = containerData; + this.readCount = new AtomicLong(0L); + this.readBytes = new AtomicLong(0L); + this.writeCount = new AtomicLong(0L); + this.writeBytes = new AtomicLong(0L); + } + + /** + * Returns container if it is active. It is not active if we have had an + * error and we are waiting for the background threads to fix the issue. + * + * @return ContainerData. + */ + public ContainerData getContainer() { + return containerData; + } + + /** + * Increase the count of pending deletion blocks. + * + * @param numBlocks increment number + */ + public void incrPendingDeletionBlocks(int numBlocks) { + this.numPendingDeletionBlocks += numBlocks; + } + + /** + * Decrease the count of pending deletion blocks. + * + * @param numBlocks decrement number + */ + public void decrPendingDeletionBlocks(int numBlocks) { + this.numPendingDeletionBlocks -= numBlocks; + } + + /** + * Get the number of pending deletion blocks. + */ + public int getNumPendingDeletionBlocks() { + return this.numPendingDeletionBlocks; + } + + /** + * Get the number of bytes read from the container. + * @return the number of bytes read from the container. + */ + public long getReadBytes() { + return readBytes.get(); + } + + /** + * Increase the number of bytes read from the container. + * @param bytes number of bytes read. + */ + public void incrReadBytes(long bytes) { + this.readBytes.addAndGet(bytes); + } + + /** + * Get the number of times the container is read. + * @return the number of times the container is read. + */ + public long getReadCount() { + return readCount.get(); + } + + /** + * Increase the number of container read count by 1. + */ + public void incrReadCount() { + this.readCount.incrementAndGet(); + } + + /** + * Get the number of bytes write into the container. + * @return the number of bytes write into the container. + */ + public long getWriteBytes() { + return writeBytes.get(); + } + + /** + * Increase the number of bytes write into the container. + * @param bytes the number of bytes write into the container. + */ + public void incrWriteBytes(long bytes) { + this.writeBytes.addAndGet(bytes); + } + + /** + * Get the number of writes into the container. + * @return the number of writes into the container. + */ + public long getWriteCount() { + return writeCount.get(); + } + + /** + * Increase the number of writes into the container by 1. + */ + public void incrWriteCount() { + this.writeCount.incrementAndGet(); + } + + /** + * Get the number of bytes used by the container. + * @return the number of bytes used by the container. + */ + public long getBytesUsed() { + return containerData.getBytesUsed(); + } + + /** + * Increase the number of bytes used by the container. + * @param used number of bytes used by the container. + * @return the current number of bytes used by the container afert increase. + */ + public long incrBytesUsed(long used) { + return containerData.addBytesUsed(used); + } + + /** + * Set the number of bytes used by the container. + * @param used the number of bytes used by the container. + */ + public void setBytesUsed(long used) { + containerData.setBytesUsed(used); + } + + /** + * Decrease the number of bytes used by the container. + * @param reclaimed the number of bytes reclaimed from the container. + * @return the current number of bytes used by the container after decrease. + */ + public long decrBytesUsed(long reclaimed) { + return this.containerData.addBytesUsed(-1L * reclaimed); + } + + /** + * Get the maximum container size. + * @return the maximum container size. + */ + public long getMaxSize() { + return containerData.getMaxSize(); + } + + /** + * Set the maximum container size. + * @param size the maximum container size. + */ + public void setMaxSize(long size) { + this.containerData.setMaxSize(size); + } + + /** + * Get the number of keys in the container. + * @return the number of keys in the container. + */ + public long getNumKeys() { + return containerData.getKeyCount(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java new file mode 100644 index 0000000..7293895 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.impl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CachingGetSpaceUsed; +import org.apache.hadoop.fs.DF; +import org.apache.hadoop.fs.GetSpaceUsed; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.nio.file.Paths; +import java.util.Scanner; + +import static org.apache.hadoop.util.RunJar.SHUTDOWN_HOOK_PRIORITY; + +/** + * Class that wraps the space usage of the Datanode Container Storage Location + * by SCM containers. + */ +public class ContainerStorageLocation { + private static final Logger LOG = + LoggerFactory.getLogger(ContainerStorageLocation.class); + + private static final String DU_CACHE_FILE = "scmUsed"; + private volatile boolean scmUsedSaved = false; + + private final StorageLocation dataLocation; + private final String storageUuId; + private final DF usage; + private final GetSpaceUsed scmUsage; + private final File scmUsedFile; + + public ContainerStorageLocation(StorageLocation dataLoc, Configuration conf) + throws IOException { + this.dataLocation = dataLoc; + this.storageUuId = DatanodeStorage.generateUuid(); + File dataDir = Paths.get(dataLoc.getNormalizedUri()).resolve( + OzoneConsts.CONTAINER_PREFIX).toFile(); + // Initialize container data root if it does not exist as required by DF/DU + if (!dataDir.exists()) { + if (!dataDir.mkdirs()) { + LOG.error("Unable to create the container storage location at : {}", + dataDir); + throw new IllegalArgumentException("Unable to create the container" + + " storage location at : " + dataDir); + } + } + scmUsedFile = new File(dataDir, DU_CACHE_FILE); + // get overall disk usage + this.usage = new DF(dataDir, conf); + // get SCM specific usage + this.scmUsage = new CachingGetSpaceUsed.Builder().setPath(dataDir) + .setConf(conf) + .setInitialUsed(loadScmUsed()) + .build(); + + // Ensure scm usage is saved during shutdown. + ShutdownHookManager.get().addShutdownHook( + new Runnable() { + @Override + public void run() { + if (!scmUsedSaved) { + saveScmUsed(); + } + } + }, SHUTDOWN_HOOK_PRIORITY); + } + + public URI getNormalizedUri() { + return dataLocation.getNormalizedUri(); + } + + public String getStorageUuId() { + return storageUuId; + } + public long getCapacity() { + long capacity = usage.getCapacity(); + return (capacity > 0) ? capacity : 0; + } + + public long getAvailable() throws IOException { + long remaining = getCapacity() - getScmUsed(); + long available = usage.getAvailable(); + if (remaining > available) { + remaining = available; + } + return (remaining > 0) ? remaining : 0; + } + + public long getScmUsed() throws IOException{ + return scmUsage.getUsed(); + } + + public void shutdown() { + saveScmUsed(); + scmUsedSaved = true; + + if (scmUsage instanceof CachingGetSpaceUsed) { + IOUtils.cleanupWithLogger(null, ((CachingGetSpaceUsed) scmUsage)); + } + } + + /** + * Read in the cached DU value and return it if it is less than 600 seconds + * old (DU update interval). Slight imprecision of scmUsed is not critical + * and skipping DU can significantly shorten the startup time. + * If the cached value is not available or too old, -1 is returned. + */ + long loadScmUsed() { + long cachedScmUsed; + long mtime; + Scanner sc; + + try { + sc = new Scanner(scmUsedFile, "UTF-8"); + } catch (FileNotFoundException fnfe) { + return -1; + } + + try { + // Get the recorded scmUsed from the file. + if (sc.hasNextLong()) { + cachedScmUsed = sc.nextLong(); + } else { + return -1; + } + // Get the recorded mtime from the file. + if (sc.hasNextLong()) { + mtime = sc.nextLong(); + } else { + return -1; + } + + // Return the cached value if mtime is okay. + if (mtime > 0 && (Time.now() - mtime < 600000L)) { + LOG.info("Cached ScmUsed found for {} : {} ", dataLocation, + cachedScmUsed); + return cachedScmUsed; + } + return -1; + } finally { + sc.close(); + } + } + + /** + * Write the current scmUsed to the cache file. + */ + void saveScmUsed() { + if (scmUsedFile.exists() && !scmUsedFile.delete()) { + LOG.warn("Failed to delete old scmUsed file in {}.", dataLocation); + } + OutputStreamWriter out = null; + try { + long used = getScmUsed(); + if (used > 0) { + out = new OutputStreamWriter(new FileOutputStream(scmUsedFile), + StandardCharsets.UTF_8); + // mtime is written last, so that truncated writes won't be valid. + out.write(Long.toString(used) + " " + Long.toString(Time.now())); + out.flush(); + out.close(); + out = null; + } + } catch (IOException ioe) { + // If write failed, the volume might be bad. Since the cache file is + // not critical, log the error and continue. + LOG.warn("Failed to write scmUsed to " + scmUsedFile, ioe); + } finally { + IOUtils.cleanupWithLogger(null, out); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org