HDDS-328. Support export and import of the KeyValueContainer. Contributed by Elek Marton.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ca29fb75 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ca29fb75 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ca29fb75 Branch: refs/heads/HDFS-12943 Commit: ca29fb754e8a162edba380a5f1deb48699e14d8b Parents: 585ebd8 Author: Xiaoyu Yao <x...@apache.org> Authored: Thu Aug 23 11:30:28 2018 -0700 Committer: Xiaoyu Yao <x...@apache.org> Committed: Thu Aug 23 11:30:28 2018 -0700 ---------------------------------------------------------------------- .../common/impl/ContainerDataYaml.java | 94 ++++--- .../container/common/interfaces/Container.java | 33 ++- .../common/interfaces/ContainerPacker.java | 58 +++++ .../container/keyvalue/KeyValueContainer.java | 128 ++++++++-- .../container/keyvalue/KeyValueHandler.java | 19 +- .../container/keyvalue/TarContainerPacker.java | 249 +++++++++++++++++++ .../keyvalue/helpers/KeyValueContainerUtil.java | 22 +- .../keyvalue/TestKeyValueContainer.java | 95 ++++++- .../keyvalue/TestTarContainerPacker.java | 231 +++++++++++++++++ 9 files changed, 849 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca29fb75/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java index aed75d3..ec6d642 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java @@ -18,31 +18,34 @@ package org.apache.hadoop.ozone.container.common.impl; -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerType; -import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; - -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.yaml.snakeyaml.Yaml; - import java.beans.IntrospectionException; -import java.io.IOException; -import java.io.InputStream; -import java.io.Writer; +import java.io.ByteArrayInputStream; +import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; import java.io.OutputStreamWriter; -import java.io.File; +import java.io.Writer; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.TreeSet; -import java.util.Map; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ContainerType; +import org.apache.hadoop.hdds.scm.container.common.helpers + .StorageContainerException; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; + +import com.google.common.base.Preconditions; +import static org.apache.hadoop.ozone.container.keyvalue + .KeyValueContainerData.KEYVALUE_YAML_TAG; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; import org.yaml.snakeyaml.constructor.AbstractConstruct; import org.yaml.snakeyaml.constructor.Constructor; import org.yaml.snakeyaml.introspector.BeanAccess; @@ -54,9 +57,6 @@ import org.yaml.snakeyaml.nodes.ScalarNode; import org.yaml.snakeyaml.nodes.Tag; import org.yaml.snakeyaml.representer.Representer; -import static org.apache.hadoop.ozone.container.keyvalue - .KeyValueContainerData.KEYVALUE_YAML_TAG; - /** * Class for creating and reading .container files. */ @@ -106,36 +106,52 @@ public final class ContainerDataYaml { /** * Read the yaml file, and return containerData. * - * @param containerFile * @throws IOException */ public static ContainerData readContainerFile(File containerFile) throws IOException { Preconditions.checkNotNull(containerFile, "containerFile cannot be null"); + try (FileInputStream inputFileStream = new FileInputStream(containerFile)) { + return readContainer(inputFileStream); + } + + } + + /** + * Read the yaml file content, and return containerData. + * + * @throws IOException + */ + public static ContainerData readContainer(byte[] containerFileContent) + throws IOException { + return readContainer( + new ByteArrayInputStream(containerFileContent)); + } + + /** + * Read the yaml content, and return containerData. + * + * @throws IOException + */ + public static ContainerData readContainer(InputStream input) + throws IOException { - InputStream input = null; ContainerData containerData; - try { - PropertyUtils propertyUtils = new PropertyUtils(); - propertyUtils.setBeanAccess(BeanAccess.FIELD); - propertyUtils.setAllowReadOnlyProperties(true); + PropertyUtils propertyUtils = new PropertyUtils(); + propertyUtils.setBeanAccess(BeanAccess.FIELD); + propertyUtils.setAllowReadOnlyProperties(true); - Representer representer = new ContainerDataRepresenter(); - representer.setPropertyUtils(propertyUtils); + Representer representer = new ContainerDataRepresenter(); + representer.setPropertyUtils(propertyUtils); - Constructor containerDataConstructor = new ContainerDataConstructor(); + Constructor containerDataConstructor = new ContainerDataConstructor(); - Yaml yaml = new Yaml(containerDataConstructor, representer); - yaml.setBeanAccess(BeanAccess.FIELD); + Yaml yaml = new Yaml(containerDataConstructor, representer); + yaml.setBeanAccess(BeanAccess.FIELD); + + containerData = (ContainerData) + yaml.load(input); - input = new FileInputStream(containerFile); - containerData = (ContainerData) - yaml.load(input); - } finally { - if (input!= null) { - input.close(); - } - } return containerData; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca29fb75/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java index 7f706b5..9380f0c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java @@ -18,26 +18,27 @@ package org.apache.hadoop.ozone.container.common.interfaces; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; + import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerLifeCycleState; +import org.apache.hadoop.hdds.scm.container.common.helpers + .StorageContainerException; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.hdds.scm.container.common.helpers. - StorageContainerException; import org.apache.hadoop.hdfs.util.RwLock; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; -import java.io.File; -import java.io.IOException; -import java.util.Map; - - /** * Interface for Container Operations. */ -public interface Container extends RwLock { +public interface Container<CONTAINERDATA extends ContainerData> extends RwLock { /** * Creates a container. @@ -71,7 +72,7 @@ public interface Container extends RwLock { * @return ContainerData - Container Data. * @throws StorageContainerException */ - ContainerData getContainerData(); + CONTAINERDATA getContainerData(); /** * Get the Container Lifecycle state. @@ -113,6 +114,20 @@ public interface Container extends RwLock { BlockIterator blockIterator() throws IOException; /** + * Import the container from an external archive. + */ + void importContainerData(InputStream stream, + ContainerPacker<CONTAINERDATA> packer) throws IOException; + + /** + * Export all the data of the container to one output archive with the help + * of the packer. + * + */ + void exportContainerData(OutputStream stream, + ContainerPacker<CONTAINERDATA> packer) throws IOException; + + /** * Returns containerReport for the container. */ StorageContainerDatanodeProtocolProtos.ContainerInfo getContainerReport() http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca29fb75/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java new file mode 100644 index 0000000..8308c23 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.interfaces; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.ozone.container.common.impl.ContainerData; + +/** + * Service to pack/unpack ContainerData container data to/from a single byte + * stream. + */ +public interface ContainerPacker<CONTAINERDATA extends ContainerData> { + + /** + * Extract the container data to the path defined by the container. + * <p> + * This doesn't contain the extraction of the container descriptor file. + * + * @return the byte content of the descriptor (which won't be written to a + * file but returned). + */ + byte[] unpackContainerData(Container<CONTAINERDATA> container, + InputStream inputStream) + throws IOException; + + /** + * Compress all the container data (chunk data, metadata db AND container + * descriptor) to one single archive. + */ + void pack(Container<CONTAINERDATA> container, OutputStream destination) + throws IOException; + + /** + * Read the descriptor from the finished archive to get the data before + * importing the container. + */ + byte[] unpackContainerDescriptor(InputStream inputStream) + throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca29fb75/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index 0ea748a..8108a11 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -18,9 +18,15 @@ package org.apache.hadoop.ozone.container.keyvalue; -import com.google.common.base.Preconditions; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.StandardCopyOption; +import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileUtil; @@ -37,32 +43,27 @@ import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; -import org.apache.hadoop.ozone.container.common.volume.VolumeSet; -import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker; import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils; import org.apache.hadoop.ozone.container.keyvalue.helpers .KeyValueContainerLocationUtil; import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.utils.MetadataStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; - -import java.util.Map; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.google.common.base.Preconditions; +import org.apache.commons.io.FileUtils; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .Result.CONTAINER_ALREADY_EXISTS; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.CONTAINER_INTERNAL_ERROR; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .Result.CONTAINER_FILES_CREATE_ERROR; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .Result.CONTAINER_INTERNAL_ERROR; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .Result.DISK_OUT_OF_SPACE; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .Result.ERROR_IN_COMPACT_DB; @@ -70,11 +71,13 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .Result.INVALID_CONTAINER_STATE; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .Result.UNSUPPORTED_REQUEST; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class to perform KeyValue Container operations. */ -public class KeyValueContainer implements Container { +public class KeyValueContainer implements Container<KeyValueContainerData> { private static final Logger LOG = LoggerFactory.getLogger(Container.class); @@ -167,6 +170,34 @@ public class KeyValueContainer implements Container { } /** + * Set all of the path realted container data fields based on the name + * conventions. + * + * @param scmId + * @param containerVolume + * @param hddsVolumeDir + */ + public void populatePathFields(String scmId, + HddsVolume containerVolume, String hddsVolumeDir) { + + long containerId = containerData.getContainerID(); + + File containerMetaDataPath = KeyValueContainerLocationUtil + .getContainerMetaDataPath(hddsVolumeDir, scmId, containerId); + + File chunksPath = KeyValueContainerLocationUtil.getChunksLocationPath( + hddsVolumeDir, scmId, containerId); + File dbFile = KeyValueContainerLocationUtil.getContainerDBFile( + containerMetaDataPath, containerId); + + //Set containerData for the KeyValueContainer. + containerData.setMetadataPath(containerMetaDataPath.getPath()); + containerData.setChunksPath(chunksPath.getPath()); + containerData.setDbFile(dbFile); + containerData.setVolume(containerVolume); + } + + /** * Writes to .container file. * * @param containerFile container file name @@ -334,6 +365,75 @@ public class KeyValueContainer implements Container { containerData.getContainerPath())); } + @Override + public void importContainerData(InputStream input, + ContainerPacker<KeyValueContainerData> packer) throws IOException { + writeLock(); + try { + if (getContainerFile().exists()) { + String errorMessage = String.format( + "Can't import container (cid=%d) data to a specific location" + + " as the container descriptor (%s) has already been exist.", + getContainerData().getContainerID(), + getContainerFile().getAbsolutePath()); + throw new IOException(errorMessage); + } + //copy the values from the input stream to the final destination + // directory. + byte[] descriptorContent = packer.unpackContainerData(this, input); + + Preconditions.checkNotNull(descriptorContent, + "Container descriptor is missing from the container archive: " + + getContainerData().getContainerID()); + + //now, we have extracted the container descriptor from the previous + //datanode. We can load it and upload it with the current data + // (original metadata + current filepath fields) + KeyValueContainerData originalContainerData = + (KeyValueContainerData) ContainerDataYaml + .readContainer(descriptorContent); + + + containerData.setState(originalContainerData.getState()); + containerData + .setContainerDBType(originalContainerData.getContainerDBType()); + containerData.setBytesUsed(originalContainerData.getBytesUsed()); + + //rewriting the yaml file with new checksum calculation. + update(originalContainerData.getMetadata(), true); + + //fill in memory stat counter (keycount, byte usage) + KeyValueContainerUtil.parseKVContainerData(containerData, config); + + } catch (Exception ex) { + //delete all the temporary data in case of any exception. + try { + FileUtils.deleteDirectory(new File(containerData.getMetadataPath())); + FileUtils.deleteDirectory(new File(containerData.getChunksPath())); + FileUtils.deleteDirectory(getContainerFile()); + } catch (Exception deleteex) { + LOG.error( + "Can not cleanup destination directories after a container import" + + " error (cid" + + containerData.getContainerID() + ")", deleteex); + } + throw ex; + } finally { + writeUnlock(); + } + } + + @Override + public void exportContainerData(OutputStream destination, + ContainerPacker<KeyValueContainerData> packer) throws IOException { + if (getContainerData().getState() != ContainerLifeCycleState.CLOSED) { + throw new IllegalStateException( + "Only closed containers could be exported: ContainerId=" + + getContainerData().getContainerID()); + } + packer.pack(this, destination); + } + /** * Acquire read lock. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca29fb75/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 9ddb474..29c359e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -47,6 +47,7 @@ import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.OpenContainerBlockMap; import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil; import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils; import org.apache.hadoop.ozone.container.common.helpers.KeyData; @@ -162,7 +163,8 @@ public class KeyValueHandler extends Handler { return volumeChoosingPolicy; } /** - * Returns OpenContainerBlockMap instance + * Returns OpenContainerBlockMap instance. + * * @return OpenContainerBlockMap */ public OpenContainerBlockMap getOpenContainerBlockMap() { @@ -269,6 +271,19 @@ public class KeyValueHandler extends Handler { return ContainerUtils.getSuccessResponse(request); } + public void populateContainerPathFields(KeyValueContainer container, + long maxSize) throws IOException { + volumeSet.acquireLock(); + try { + HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet + .getVolumesList(), maxSize); + String hddsVolumeDir = containerVolume.getHddsRootDir().toString(); + container.populatePathFields(scmID, containerVolume, hddsVolumeDir); + } finally { + volumeSet.releaseLock(); + } + } + /** * Handles Read Container Request. Returns the ContainerData as response. */ @@ -322,7 +337,7 @@ public class KeyValueHandler extends Handler { * Open containers cannot be deleted. * Holds writeLock on ContainerSet till the container is removed from * containerMap. On disk deletion of container files will happen - * asynchornously without the lock. + * asynchronously without the lock. */ ContainerCommandResponseProto handleDeleteContainer( ContainerCommandRequestProto request, KeyValueContainer kvContainer) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca29fb75/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java new file mode 100644 index 0000000..13689a7 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.keyvalue; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.stream.Collectors; + +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker; + +import com.google.common.base.Preconditions; +import org.apache.commons.compress.archivers.ArchiveEntry; +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.commons.compress.compressors.CompressorException; +import org.apache.commons.compress.compressors.CompressorInputStream; +import org.apache.commons.compress.compressors.CompressorOutputStream; +import org.apache.commons.compress.compressors.CompressorStreamFactory; +import org.apache.commons.io.IOUtils; + +/** + * Compress/uncompress KeyValueContainer data to a tar.gz archive. + */ +public class TarContainerPacker + implements ContainerPacker<KeyValueContainerData> { + + private static final String CHUNKS_DIR_NAME = OzoneConsts.STORAGE_DIR_CHUNKS; + + private static final String DB_DIR_NAME = "db"; + + private static final String CONTAINER_FILE_NAME = "container.yaml"; + + + + /** + * Given an input stream (tar file) extract the data to the specified + * directories. + * + * @param container container which defines the destination structure. + * @param inputStream the input stream. + * @throws IOException + */ + @Override + public byte[] unpackContainerData(Container<KeyValueContainerData> container, + InputStream inputStream) + throws IOException { + byte[] descriptorFileContent = null; + try { + KeyValueContainerData containerData = container.getContainerData(); + CompressorInputStream compressorInputStream = + new CompressorStreamFactory() + .createCompressorInputStream(CompressorStreamFactory.GZIP, + inputStream); + + TarArchiveInputStream tarInput = + new TarArchiveInputStream(compressorInputStream); + + TarArchiveEntry entry = tarInput.getNextTarEntry(); + while (entry != null) { + String name = entry.getName(); + if (name.startsWith(DB_DIR_NAME + "/")) { + Path destinationPath = containerData.getDbFile().toPath() + .resolve(name.substring(DB_DIR_NAME.length() + 1)); + extractEntry(tarInput, entry.getSize(), destinationPath); + } else if (name.startsWith(CHUNKS_DIR_NAME + "/")) { + Path destinationPath = Paths.get(containerData.getChunksPath()) + .resolve(name.substring(CHUNKS_DIR_NAME.length() + 1)); + extractEntry(tarInput, entry.getSize(), destinationPath); + } else if (name.equals(CONTAINER_FILE_NAME)) { + //Don't do anything. Container file should be unpacked in a + //separated step by unpackContainerDescriptor call. + descriptorFileContent = readEntry(tarInput, entry); + } else { + throw new IllegalArgumentException( + "Unknown entry in the tar file: " + "" + name); + } + entry = tarInput.getNextTarEntry(); + } + return descriptorFileContent; + + } catch (CompressorException e) { + throw new IOException( + "Can't uncompress the given container: " + container + .getContainerData().getContainerID(), + e); + } + } + + private void extractEntry(TarArchiveInputStream tarInput, long size, + Path path) throws IOException { + Preconditions.checkNotNull(path, "Path element should not be null"); + Path parent = Preconditions.checkNotNull(path.getParent(), + "Path element should have a parent directory"); + Files.createDirectories(parent); + try (BufferedOutputStream bos = new BufferedOutputStream( + new FileOutputStream(path.toAbsolutePath().toString()))) { + int bufferSize = 1024; + byte[] buffer = new byte[bufferSize + 1]; + long remaining = size; + while (remaining > 0) { + int read = + tarInput.read(buffer, 0, (int) Math.min(remaining, bufferSize)); + if (read >= 0) { + remaining -= read; + bos.write(buffer, 0, read); + } else { + remaining = 0; + } + } + } + + } + + /** + * Given a containerData include all the required container data/metadata + * in a tar file. + * + * @param container Container to archive (data + metadata). + * @param destination Destination tar file/stream. + * @throws IOException + */ + @Override + public void pack(Container<KeyValueContainerData> container, + OutputStream destination) + throws IOException { + + KeyValueContainerData containerData = container.getContainerData(); + + try (CompressorOutputStream gzippedOut = new CompressorStreamFactory() + .createCompressorOutputStream(CompressorStreamFactory.GZIP, + destination)) { + + try (ArchiveOutputStream archiveOutputStream = new TarArchiveOutputStream( + gzippedOut)) { + + includePath(containerData.getDbFile().toString(), DB_DIR_NAME, + archiveOutputStream); + + includePath(containerData.getChunksPath(), CHUNKS_DIR_NAME, + archiveOutputStream); + + includeFile(container.getContainerFile(), + CONTAINER_FILE_NAME, + archiveOutputStream); + } + } catch (CompressorException e) { + throw new IOException( + "Can't compress the container: " + containerData.getContainerID(), + e); + } + + } + + @Override + public byte[] unpackContainerDescriptor(InputStream inputStream) + throws IOException { + try { + CompressorInputStream compressorInputStream = + new CompressorStreamFactory() + .createCompressorInputStream(CompressorStreamFactory.GZIP, + inputStream); + + TarArchiveInputStream tarInput = + new TarArchiveInputStream(compressorInputStream); + + TarArchiveEntry entry = tarInput.getNextTarEntry(); + while (entry != null) { + String name = entry.getName(); + if (name.equals(CONTAINER_FILE_NAME)) { + return readEntry(tarInput, entry); + } + entry = tarInput.getNextTarEntry(); + } + + } catch (CompressorException e) { + throw new IOException( + "Can't read the container descriptor from the container archive", + e); + } + throw new IOException( + "Container descriptor is missing from the container archive."); + } + + private byte[] readEntry(TarArchiveInputStream tarInput, + TarArchiveEntry entry) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + int bufferSize = 1024; + byte[] buffer = new byte[bufferSize + 1]; + long remaining = entry.getSize(); + while (remaining > 0) { + int read = + tarInput.read(buffer, 0, (int) Math.min(remaining, bufferSize)); + remaining -= read; + bos.write(buffer, 0, read); + } + return bos.toByteArray(); + } + + private void includePath(String containerPath, String subdir, + ArchiveOutputStream archiveOutputStream) throws IOException { + + for (Path path : Files.list(Paths.get(containerPath)) + .collect(Collectors.toList())) { + + includeFile(path.toFile(), subdir + "/" + path.getFileName(), + archiveOutputStream); + } + } + + private void includeFile(File file, String entryName, + ArchiveOutputStream archiveOutputStream) throws IOException { + ArchiveEntry archiveEntry = + archiveOutputStream.createArchiveEntry(file, entryName); + archiveOutputStream.putArchiveEntry(archiveEntry); + try (FileInputStream fis = new FileInputStream(file)) { + IOUtils.copy(fis, archiveOutputStream); + } + archiveOutputStream.closeArchiveEntry(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca29fb75/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java index 2352cf6..ed4536f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java @@ -17,10 +17,14 @@ */ package org.apache.hadoop.ozone.container.keyvalue.helpers; -import com.google.common.base.Preconditions; -import org.apache.commons.io.FileUtils; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; @@ -32,16 +36,12 @@ import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.utils.MetadataKeyFilters; import org.apache.hadoop.utils.MetadataStore; import org.apache.hadoop.utils.MetadataStoreBuilder; + +import com.google.common.base.Preconditions; +import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.List; -import java.util.Map; - /** * Class which defines utility methods for KeyValueContainer. */ @@ -157,7 +157,7 @@ public final class KeyValueContainerUtil { * @throws IOException */ public static void parseKVContainerData(KeyValueContainerData kvContainerData, - OzoneConfiguration config) throws IOException { + Configuration config) throws IOException { long containerID = kvContainerData.getContainerID(); File metadataPath = new File(kvContainerData.getMetadataPath()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca29fb75/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java index 6ff2eca..7359868 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java @@ -23,7 +23,8 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; - +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ContainerLifeCycleState; import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; @@ -37,6 +38,8 @@ import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.utils.MetadataStore; + +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -46,6 +49,8 @@ import org.mockito.Mockito; import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -74,7 +79,6 @@ public class TestKeyValueContainer { private String scmId = UUID.randomUUID().toString(); private VolumeSet volumeSet; private RoundRobinVolumeChoosingPolicy volumeChoosingPolicy; - private long containerID = 1L; private KeyValueContainerData keyValueContainerData; private KeyValueContainer keyValueContainer; @@ -141,13 +145,14 @@ public class TestKeyValueContainer { } + @SuppressWarnings("RedundantCast") @Test public void testCreateContainer() throws Exception { // Create Container. keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId); - keyValueContainerData = (KeyValueContainerData) keyValueContainer + keyValueContainerData = keyValueContainer .getContainerData(); String containerMetaDataPath = keyValueContainerData @@ -167,6 +172,86 @@ public class TestKeyValueContainer { } @Test + public void testContainerImportExport() throws Exception { + + long containerId = keyValueContainer.getContainerData().getContainerID(); + // Create Container. + keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId); + + + keyValueContainerData = keyValueContainer + .getContainerData(); + + keyValueContainerData.setState(ContainerLifeCycleState.CLOSED); + + int numberOfKeysToWrite = 12; + //write one few keys to check the key count after import + MetadataStore metadataStore = KeyUtils.getDB(keyValueContainerData, conf); + for (int i = 0; i < numberOfKeysToWrite; i++) { + metadataStore.put(("test" + i).getBytes(), "test".getBytes()); + } + metadataStore.close(); + + Map<String, String> metadata = new HashMap<>(); + metadata.put("key1", "value1"); + keyValueContainer.update(metadata, true); + + //destination path + File folderToExport = folder.newFile("exported.tar.gz"); + + TarContainerPacker packer = new TarContainerPacker(); + + //export the container + try (FileOutputStream fos = new FileOutputStream(folderToExport)) { + keyValueContainer + .exportContainerData(fos, packer); + } + + //delete the original one + keyValueContainer.delete(true); + + //create a new one + KeyValueContainerData containerData = + new KeyValueContainerData(containerId, 1, + keyValueContainerData.getMaxSizeGB()); + KeyValueContainer container = new KeyValueContainer(containerData, conf); + + HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet + .getVolumesList(), 1); + String hddsVolumeDir = containerVolume.getHddsRootDir().toString(); + + container.populatePathFields(scmId, containerVolume, hddsVolumeDir); + try (FileInputStream fis = new FileInputStream(folderToExport)) { + container.importContainerData(fis, packer); + } + + Assert.assertEquals("value1", containerData.getMetadata().get("key1")); + Assert.assertEquals(keyValueContainerData.getContainerDBType(), + containerData.getContainerDBType()); + Assert.assertEquals(keyValueContainerData.getState(), + containerData.getState()); + Assert.assertEquals(numberOfKeysToWrite, + containerData.getKeyCount()); + Assert.assertEquals(keyValueContainerData.getLayOutVersion(), + containerData.getLayOutVersion()); + Assert.assertEquals(keyValueContainerData.getMaxSizeGB(), + containerData.getMaxSizeGB()); + Assert.assertEquals(keyValueContainerData.getBytesUsed(), + containerData.getBytesUsed()); + + //Can't overwrite existing container + try { + try (FileInputStream fis = new FileInputStream(folderToExport)) { + container.importContainerData(fis, packer); + } + fail("Container is imported twice. Previous files are overwritten"); + } catch (Exception ex) { + //all good + } + + } + + @Test public void testDuplicateContainer() throws Exception { try { // Create Container. @@ -224,7 +309,7 @@ public class TestKeyValueContainer { keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId); keyValueContainer.close(); - keyValueContainerData = (KeyValueContainerData) keyValueContainer + keyValueContainerData = keyValueContainer .getContainerData(); assertEquals(ContainerProtos.ContainerLifeCycleState.CLOSED, @@ -249,7 +334,7 @@ public class TestKeyValueContainer { metadata.put("OWNER", "hdfs"); keyValueContainer.update(metadata, true); - keyValueContainerData = (KeyValueContainerData) keyValueContainer + keyValueContainerData = keyValueContainer .getContainerData(); assertEquals(2, keyValueContainerData.getMetadata().size()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca29fb75/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java new file mode 100644 index 0000000..a599f72 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.keyvalue; + +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.compressors.CompressorException; +import org.apache.commons.compress.compressors.CompressorInputStream; +import org.apache.commons.compress.compressors.CompressorStreamFactory; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test the tar/untar for a given container. + */ +public class TestTarContainerPacker { + + private static final String TEST_DB_FILE_NAME = "test1"; + + private static final String TEST_DB_FILE_CONTENT = "test1"; + + private static final String TEST_CHUNK_FILE_NAME = "chunk1"; + + private static final String TEST_CHUNK_FILE_CONTENT = "This is a chunk"; + + private static final String TEST_DESCRIPTOR_FILE_CONTENT = "descriptor"; + + private ContainerPacker packer = new TarContainerPacker(); + + private static final Path SOURCE_CONTAINER_ROOT = + Paths.get("target/test/data/packer-source-dir"); + + private static final Path DEST_CONTAINER_ROOT = + Paths.get("target/test/data/packer-dest-dir"); + + @BeforeClass + public static void init() throws IOException { + initDir(SOURCE_CONTAINER_ROOT); + initDir(DEST_CONTAINER_ROOT); + } + + private static void initDir(Path path) throws IOException { + if (path.toFile().exists()) { + FileUtils.deleteDirectory(path.toFile()); + } + path.toFile().mkdirs(); + } + + private KeyValueContainerData createContainer(long id, Path dir, + OzoneConfiguration conf) throws IOException { + + Path containerDir = dir.resolve("container" + id); + Path dbDir = containerDir.resolve("db"); + Path dataDir = containerDir.resolve("data"); + Files.createDirectories(dbDir); + Files.createDirectories(dataDir); + + KeyValueContainerData containerData = new KeyValueContainerData(id, -1); + containerData.setChunksPath(dataDir.toString()); + containerData.setMetadataPath(dbDir.getParent().toString()); + containerData.setDbFile(dbDir.toFile()); + + + return containerData; + } + + @Test + public void pack() throws IOException, CompressorException { + + //GIVEN + OzoneConfiguration conf = new OzoneConfiguration(); + + KeyValueContainerData sourceContainerData = + createContainer(1L, SOURCE_CONTAINER_ROOT, conf); + + KeyValueContainer sourceContainer = + new KeyValueContainer(sourceContainerData, conf); + + //sample db file in the metadata directory + try (FileWriter writer = new FileWriter( + sourceContainerData.getDbFile().toPath() + .resolve(TEST_DB_FILE_NAME) + .toFile())) { + IOUtils.write(TEST_DB_FILE_CONTENT, writer); + } + + //sample chunk file in the chunk directory + try (FileWriter writer = new FileWriter( + Paths.get(sourceContainerData.getChunksPath()) + .resolve(TEST_CHUNK_FILE_NAME) + .toFile())) { + IOUtils.write(TEST_CHUNK_FILE_CONTENT, writer); + } + + //sample container descriptor file + try (FileWriter writer = new FileWriter( + sourceContainer.getContainerFile())) { + IOUtils.write(TEST_DESCRIPTOR_FILE_CONTENT, writer); + } + + Path targetFile = + SOURCE_CONTAINER_ROOT.getParent().resolve("container.tar.gz"); + + //WHEN: pack it + try (FileOutputStream output = new FileOutputStream(targetFile.toFile())) { + packer.pack(sourceContainer, output); + } + + //THEN: check the result + try (FileInputStream input = new FileInputStream(targetFile.toFile())) { + CompressorInputStream uncompressed = new CompressorStreamFactory() + .createCompressorInputStream(CompressorStreamFactory.GZIP, input); + TarArchiveInputStream tarStream = new TarArchiveInputStream(uncompressed); + + TarArchiveEntry entry; + Map<String, TarArchiveEntry> entries = new HashMap<>(); + while ((entry = tarStream.getNextTarEntry()) != null) { + entries.put(entry.getName(), entry); + } + + Assert.assertTrue( + entries.containsKey("container.yaml")); + + } + + //read the container descriptor only + try (FileInputStream input = new FileInputStream(targetFile.toFile())) { + String containerYaml = new String(packer.unpackContainerDescriptor(input), + Charset.forName(StandardCharsets.UTF_8.name())); + Assert.assertEquals(TEST_DESCRIPTOR_FILE_CONTENT, containerYaml); + } + + KeyValueContainerData destinationContainerData = + createContainer(2L, DEST_CONTAINER_ROOT, conf); + + KeyValueContainer destinationContainer = + new KeyValueContainer(destinationContainerData, conf); + + String descriptor = ""; + + //unpackContainerData + try (FileInputStream input = new FileInputStream(targetFile.toFile())) { + descriptor = + new String(packer.unpackContainerData(destinationContainer, input), + Charset.forName(StandardCharsets.UTF_8.name())); + } + + assertExampleMetadataDbIsGood( + destinationContainerData.getDbFile().toPath()); + assertExampleChunkFileIsGood( + Paths.get(destinationContainerData.getChunksPath())); + Assert.assertFalse( + "Descriptor file should not been exctarcted by the " + + "unpackContainerData Call", + destinationContainer.getContainerFile().exists()); + Assert.assertEquals(TEST_DESCRIPTOR_FILE_CONTENT, descriptor); + + } + + + private void assertExampleMetadataDbIsGood(Path dbPath) + throws IOException { + + Path dbFile = dbPath.resolve(TEST_DB_FILE_NAME); + + Assert.assertTrue( + "example DB file is missing after pack/unpackContainerData: " + dbFile, + Files.exists(dbFile)); + + try (FileInputStream testFile = new FileInputStream(dbFile.toFile())) { + List<String> strings = IOUtils + .readLines(testFile, Charset.forName(StandardCharsets.UTF_8.name())); + Assert.assertEquals(1, strings.size()); + Assert.assertEquals(TEST_DB_FILE_CONTENT, strings.get(0)); + } + } + + private void assertExampleChunkFileIsGood(Path chunkDirPath) + throws IOException { + + Path chunkFile = chunkDirPath.resolve(TEST_CHUNK_FILE_NAME); + + Assert.assertTrue( + "example chunk file is missing after pack/unpackContainerData: " + + chunkFile, + Files.exists(chunkFile)); + + try (FileInputStream testFile = new FileInputStream(chunkFile.toFile())) { + List<String> strings = IOUtils + .readLines(testFile, Charset.forName(StandardCharsets.UTF_8.name())); + Assert.assertEquals(1, strings.size()); + Assert.assertEquals(TEST_CHUNK_FILE_CONTENT, strings.get(0)); + } + } + +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org