http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java deleted file mode 100644 index fb58a4e..0000000 --- a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java +++ /dev/null @@ -1,377 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock; - -import com.google.common.primitives.Longs; -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.commons.lang.RandomStringUtils; -import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics; -import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher; -import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; -import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ozone.MiniOzoneClassicCluster; -import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.protocolPB - .StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; -import org.apache.hadoop.test.GenericTestUtils; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_DISK_CACHE_PATH_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_TRACE_IO; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE; - -/** - * Tests for Cblock read write functionality. - */ -public class TestCBlockReadWrite { - private final static long GB = 1024 * 1024 * 1024; - private final static int KB = 1024; - private static MiniOzoneCluster cluster; - private static OzoneConfiguration config; - private static StorageContainerLocationProtocolClientSideTranslatorPB - storageContainerLocationClient; - private static XceiverClientManager xceiverClientManager; - - @BeforeClass - public static void init() throws IOException { - config = new OzoneConfiguration(); - String path = GenericTestUtils - .getTempPath(TestCBlockReadWrite.class.getSimpleName()); - config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); - config.setBoolean(DFS_CBLOCK_TRACE_IO, true); - config.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - cluster = new MiniOzoneClassicCluster.Builder(config) - .numDataNodes(1) - .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build(); - storageContainerLocationClient = cluster - .createStorageContainerLocationClient(); - xceiverClientManager = new XceiverClientManager(config); - } - - @AfterClass - public static void shutdown() throws InterruptedException { - if (cluster != null) { - cluster.shutdown(); - } - IOUtils.cleanupWithLogger(null, storageContainerLocationClient, cluster); - } - - /** - * getContainerPipelines creates a set of containers and returns the - * Pipelines that define those containers. - * - * @param count - Number of containers to create. - * @return - List of Pipelines. - * @throws IOException throws Exception - */ - private List<Pipeline> getContainerPipeline(int count) throws IOException { - List<Pipeline> containerPipelines = new LinkedList<>(); - for (int x = 0; x < count; x++) { - String traceID = "trace" + RandomStringUtils.randomNumeric(4); - String containerName = "container" + RandomStringUtils.randomNumeric(10); - Pipeline pipeline = - storageContainerLocationClient.allocateContainer( - xceiverClientManager.getType(), - xceiverClientManager.getFactor(), containerName, "CBLOCK"); - XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline); - ContainerProtocolCalls.createContainer(client, traceID); - // This step is needed since we set private data on pipelines, when we - // read the list from CBlockServer. So we mimic that action here. - pipeline.setData(Longs.toByteArray(x)); - containerPipelines.add(pipeline); - } - return containerPipelines; - } - - /** - * This test creates a cache and performs a simple write / read. - * The operations are done by bypassing the cache. - * - * @throws IOException - */ - @Test - public void testDirectIO() throws IOException, - InterruptedException, TimeoutException { - OzoneConfiguration cConfig = new OzoneConfiguration(); - cConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false); - cConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); - final long blockID = 0; - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - String data = RandomStringUtils.random(4 * KB); - String dataHash = DigestUtils.sha256Hex(data); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(cConfig, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(cConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(getContainerPipeline(10)) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - Assert.assertFalse(cache.isShortCircuitIOEnabled()); - cache.put(blockID, data.getBytes(StandardCharsets.UTF_8)); - Assert.assertEquals(1, metrics.getNumDirectBlockWrites()); - Assert.assertEquals(1, metrics.getNumWriteOps()); - // Please note that this read is directly from remote container - LogicalBlock block = cache.get(blockID); - Assert.assertEquals(1, metrics.getNumReadOps()); - Assert.assertEquals(0, metrics.getNumReadCacheHits()); - Assert.assertEquals(1, metrics.getNumReadCacheMiss()); - Assert.assertEquals(0, metrics.getNumReadLostBlocks()); - Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites()); - - cache.put(blockID + 1, data.getBytes(StandardCharsets.UTF_8)); - Assert.assertEquals(2, metrics.getNumDirectBlockWrites()); - Assert.assertEquals(2, metrics.getNumWriteOps()); - Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites()); - // Please note that this read is directly from remote container - block = cache.get(blockID + 1); - Assert.assertEquals(2, metrics.getNumReadOps()); - Assert.assertEquals(0, metrics.getNumReadCacheHits()); - Assert.assertEquals(2, metrics.getNumReadCacheMiss()); - Assert.assertEquals(0, metrics.getNumReadLostBlocks()); - String readHash = DigestUtils.sha256Hex(block.getData().array()); - Assert.assertEquals("File content does not match.", dataHash, readHash); - GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000); - cache.close(); - } - - /** - * This test writes some block to the cache and then shuts down the cache - * The cache is then restarted with "short.circuit.io" disable to check - * that the blocks are read correctly from the container. - * - * @throws IOException - */ - @Test - public void testContainerWrites() throws IOException, - InterruptedException, TimeoutException { - // Create a new config so that this tests write metafile to new location - OzoneConfiguration flushTestConfig = new OzoneConfiguration(); - String path = GenericTestUtils - .getTempPath(TestCBlockReadWrite.class.getSimpleName()); - flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); - flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); - flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - flushTestConfig.setTimeDuration(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL, 3, - TimeUnit.SECONDS); - XceiverClientManager xcm = new XceiverClientManager(flushTestConfig); - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - - int numUniqueBlocks = 4; - String[] data = new String[numUniqueBlocks]; - String[] dataHash = new String[numUniqueBlocks]; - for (int i = 0; i < numUniqueBlocks; i++) { - data[i] = RandomStringUtils.random(4 * KB); - dataHash[i] = DigestUtils.sha256Hex(data[i]); - } - - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, - xcm, metrics); - List<Pipeline> pipelines = getContainerPipeline(10); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(pipelines) - .setClientManager(xcm) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - Thread flushListenerThread = new Thread(flusher); - flushListenerThread.setDaemon(true); - flushListenerThread.start(); - Assert.assertTrue(cache.isShortCircuitIOEnabled()); - // Write data to the cache - for (int i = 0; i < 512; i++) { - cache.put(i, data[i % numUniqueBlocks].getBytes(StandardCharsets.UTF_8)); - } - // Close the cache and flush the data to the containers - cache.close(); - Assert.assertEquals(0, metrics.getNumDirectBlockWrites()); - Assert.assertEquals(512, metrics.getNumWriteOps()); - Thread.sleep(3000); - flusher.shutdown(); - Assert.assertTrue(metrics.getNumBlockBufferFlushTriggered() > 1); - Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted()); - Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks()); - Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks()); - Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB()); - // Now disable DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO and restart cache - flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false); - CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher newFlusher = - new ContainerCacheFlusher(flushTestConfig, xcm, newMetrics); - CBlockLocalCache newCache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(pipelines) - .setClientManager(xcm) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(newFlusher) - .setCBlockTargetMetrics(newMetrics) - .build(); - newCache.start(); - Assert.assertFalse(newCache.isShortCircuitIOEnabled()); - // this read will be from the container, also match the hash - for (int i = 0; i < 512; i++) { - LogicalBlock block = newCache.get(i); - String readHash = DigestUtils.sha256Hex(block.getData().array()); - Assert.assertEquals("File content does not match, for index:" - + i, dataHash[i % numUniqueBlocks], readHash); - } - Assert.assertEquals(0, newMetrics.getNumReadLostBlocks()); - Assert.assertEquals(0, newMetrics.getNumFailedReadBlocks()); - newCache.close(); - newFlusher.shutdown(); - } - - @Test - public void testRetryLog() throws IOException, - InterruptedException, TimeoutException { - // Create a new config so that this tests write metafile to new location - OzoneConfiguration flushTestConfig = new OzoneConfiguration(); - String path = GenericTestUtils - .getTempPath(TestCBlockReadWrite.class.getSimpleName()); - flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); - flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); - flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - flushTestConfig.setTimeDuration(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL, - 3, - TimeUnit.SECONDS); - - int numblocks = 10; - flushTestConfig.setInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE, numblocks); - - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - String data = RandomStringUtils.random(4 * KB); - - List<Pipeline> fakeContainerPipelines = new LinkedList<>(); - PipelineChannel pipelineChannel = new PipelineChannel("fake", - LifeCycleState.OPEN, ReplicationType.STAND_ALONE, ReplicationFactor.ONE, - "fake"); - Pipeline fakePipeline = new Pipeline("fake", pipelineChannel); - fakePipeline.setData(Longs.toByteArray(1)); - fakeContainerPipelines.add(fakePipeline); - - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(fakeContainerPipelines) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - Thread flushListenerThread = new Thread(flusher); - flushListenerThread.setDaemon(true); - flushListenerThread.start(); - - for (int i = 0; i < numblocks; i++) { - cache.put(i, data.getBytes(StandardCharsets.UTF_8)); - } - Assert.assertEquals(numblocks, metrics.getNumWriteOps()); - Thread.sleep(3000); - - // all the writes to the container will fail because of fake pipelines - Assert.assertEquals(numblocks, metrics.getNumDirtyLogBlockRead()); - Assert.assertTrue( - metrics.getNumWriteGenericExceptionRetryBlocks() >= numblocks); - Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks()); - Assert.assertEquals(0, metrics.getNumFailedRetryLogFileWrites()); - Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB()); - cache.close(); - flusher.shutdown(); - - // restart cache with correct pipelines, now blocks should be uploaded - // correctly - CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher newFlusher = - new ContainerCacheFlusher(flushTestConfig, - xceiverClientManager, newMetrics); - CBlockLocalCache newCache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(getContainerPipeline(10)) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(newFlusher) - .setCBlockTargetMetrics(newMetrics) - .build(); - newCache.start(); - Thread newFlushListenerThread = new Thread(newFlusher); - newFlushListenerThread.setDaemon(true); - newFlushListenerThread.start(); - Thread.sleep(3000); - Assert.assertTrue(newMetrics.getNumRetryLogBlockRead() >= numblocks); - Assert.assertEquals(0, newMetrics.getNumWriteGenericExceptionRetryBlocks()); - Assert.assertEquals(0, newMetrics.getNumWriteIOExceptionRetryBlocks()); - Assert.assertEquals(0, newMetrics.getNumFailedReleaseLevelDB()); - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockServer.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockServer.java b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockServer.java deleted file mode 100644 index 386c9b2..0000000 --- a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockServer.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock; - -import org.apache.commons.lang.RandomStringUtils; -import org.apache.hadoop.cblock.meta.VolumeInfo; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.scm.client.ScmClient; -import org.apache.hadoop.cblock.util.MockStorageClient; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SERVICERPC_ADDRESS_KEY; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -/** - * This class tests the basics of CBlock server. Mainly about the four - * operations on volumes: create, delete, info and list. - */ -public class TestCBlockServer { - private static CBlockManager cBlockManager; - private static OzoneConfiguration conf; - - @Before - public void setup() throws Exception { - ScmClient storageClient = new MockStorageClient(); - conf = new OzoneConfiguration(); - conf.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0"); - conf.set(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, "127.0.0.1:0"); - cBlockManager = new CBlockManager(conf, storageClient); - cBlockManager.start(); - } - - @After - public void clean() { - cBlockManager.stop(); - cBlockManager.join(); - cBlockManager.clean(); - } - - /** - * Test create volume for different users. - * @throws Exception - */ - @Test - public void testCreateVolume() throws Exception { - String userName1 = "user" + RandomStringUtils.randomNumeric(5); - String userName2 = "user" + RandomStringUtils.randomNumeric(5); - String volumeName1 = "volume" + RandomStringUtils.randomNumeric(5); - String volumeName2 = "volume" + RandomStringUtils.randomNumeric(5); - long volumeSize = 1L*1024*1024; - int blockSize = 4096; - cBlockManager.createVolume(userName1, volumeName1, volumeSize, blockSize); - List<VolumeInfo> volumes = cBlockManager.listVolume(userName1); - assertEquals(1, volumes.size()); - VolumeInfo existingVolume = volumes.get(0); - assertEquals(userName1, existingVolume.getUserName()); - assertEquals(volumeName1, existingVolume.getVolumeName()); - assertEquals(volumeSize, existingVolume.getVolumeSize()); - assertEquals(blockSize, existingVolume.getBlockSize()); - - cBlockManager.createVolume(userName1, volumeName2, volumeSize, blockSize); - cBlockManager.createVolume(userName2, volumeName1, volumeSize, blockSize); - volumes = cBlockManager.listVolume(userName1); - assertEquals(2, volumes.size()); - volumes = cBlockManager.listVolume(userName2); - assertEquals(1, volumes.size()); - } - - /** - * Test delete volume. - * @throws Exception - */ - @Test - public void testDeleteVolume() throws Exception { - String userName = "user" + RandomStringUtils.randomNumeric(5); - String volumeName1 = "volume" + RandomStringUtils.randomNumeric(5); - String volumeName2 = "volume" + RandomStringUtils.randomNumeric(5); - long volumeSize = 1L*1024*1024; - int blockSize = 4096; - cBlockManager.createVolume(userName, volumeName1, volumeSize, blockSize); - cBlockManager.createVolume(userName, volumeName2, volumeSize, blockSize); - cBlockManager.deleteVolume(userName, volumeName1, true); - List<VolumeInfo> volumes = cBlockManager.listVolume(userName); - assertEquals(1, volumes.size()); - - VolumeInfo existingVolume = volumes.get(0); - assertEquals(userName, existingVolume.getUserName()); - assertEquals(volumeName2, existingVolume.getVolumeName()); - assertEquals(volumeSize, existingVolume.getVolumeSize()); - assertEquals(blockSize, existingVolume.getBlockSize()); - } - - /** - * Test info volume. - * - * TODO : usage field is not being tested (as it is not implemented yet) - * @throws Exception - */ - @Test - public void testInfoVolume() throws Exception { - String userName = "user" + RandomStringUtils.randomNumeric(5); - String volumeName = "volume" + RandomStringUtils.randomNumeric(5); - long volumeSize = 1L*1024*1024; - int blockSize = 4096; - cBlockManager.createVolume(userName, volumeName, volumeSize, blockSize); - VolumeInfo info = cBlockManager.infoVolume(userName, volumeName); - assertEquals(userName, info.getUserName()); - assertEquals(volumeName, info.getVolumeName()); - assertEquals(volumeSize, info.getVolumeSize()); - assertEquals(blockSize, info.getBlockSize()); - } - - /** - * Test listing a number of volumes. - * @throws Exception - */ - @Test - public void testListVolume() throws Exception { - String userName = "user" + RandomStringUtils.randomNumeric(5); - String volumeName ="volume" + RandomStringUtils.randomNumeric(5); - long volumeSize = 1L*1024*1024; - int blockSize = 4096; - int volumeNum = 100; - for (int i = 0; i<volumeNum; i++) { - cBlockManager.createVolume(userName, volumeName + i, - volumeSize, blockSize); - } - List<VolumeInfo> volumes = cBlockManager.listVolume(userName); - assertEquals(volumeNum, volumes.size()); - Set<String> volumeIds = new HashSet<>(); - for (int i = 0; i<volumeNum; i++) { - VolumeInfo volumeInfo = volumes.get(i); - assertEquals(userName, volumeInfo.getUserName()); - assertFalse(volumeIds.contains(volumeName + i)); - volumeIds.add(volumeName + i); - assertEquals(volumeSize, volumeInfo.getVolumeSize()); - assertEquals(blockSize, volumeInfo.getBlockSize()); - } - for (int i = 0; i<volumeNum; i++) { - assertTrue(volumeIds.contains(volumeName + i)); - } - } - - /** - * Test listing a number of volumes. - * @throws Exception - */ - @Test - public void testListVolumes() throws Exception { - String volumeName ="volume" + RandomStringUtils.randomNumeric(5); - long volumeSize = 1L*1024*1024; - int blockSize = 4096; - int volumeNum = 100; - int userCount = 10; - - assertTrue("We need at least one volume for each user", - userCount < volumeNum); - - for (int i = 0; i<volumeNum; i++) { - String userName = - "user-" + (i % userCount); - cBlockManager.createVolume(userName, volumeName + i, - volumeSize, blockSize); - } - List<VolumeInfo> allVolumes = cBlockManager.listVolumes(); - //check if we have the volumes from all the users. - - Set<String> volumeIds = new HashSet<>(); - Set<String> usernames = new HashSet<>(); - for (int i = 0; i < allVolumes.size(); i++) { - VolumeInfo volumeInfo = allVolumes.get(i); - assertFalse(volumeIds.contains(volumeName + i)); - usernames.add(volumeInfo.getUserName()); - volumeIds.add(volumeName + i); - assertEquals(volumeSize, volumeInfo.getVolumeSize()); - assertEquals(blockSize, volumeInfo.getBlockSize()); - } - - assertEquals(volumeNum, volumeIds.size()); - for (int i = 0; i<volumeNum; i++) { - assertTrue(volumeIds.contains(volumeName + i)); - } - - assertEquals(userCount, usernames.size()); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockServerPersistence.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockServerPersistence.java b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockServerPersistence.java deleted file mode 100644 index db13972..0000000 --- a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockServerPersistence.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock; - -import org.apache.hadoop.cblock.meta.VolumeDescriptor; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.scm.client.ScmClient; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.cblock.util.MockStorageClient; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.util.List; - -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SERVICERPC_ADDRESS_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY; -import static org.junit.Assert.assertEquals; - -/** - * Test the CBlock server state is maintained in persistent storage and can be - * recovered on CBlock server restart. - */ -public class TestCBlockServerPersistence { - - /** - * Test when cblock server fails with volume meta data, the meta data can be - * restored correctly. - * @throws Exception - */ - @Test - public void testWriteToPersistentStore() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); - String userName = "testWriteToPersistentStore"; - String volumeName1 = "testVolume1"; - String volumeName2 = "testVolume2"; - long volumeSize1 = 30L*1024*1024*1024; - long volumeSize2 = 15L*1024*1024*1024; - int blockSize = 4096; - CBlockManager cBlockManager = null; - CBlockManager cBlockManager1 = null; - String path = GenericTestUtils - .getTempPath(TestCBlockServerPersistence.class.getSimpleName()); - File filePath = new File(path); - if(!filePath.exists() && !filePath.mkdirs()) { - throw new IOException("Unable to create test DB dir"); - } - conf.set(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, path.concat( - "/testCblockPersistence.dat")); - try { - ScmClient storageClient = new MockStorageClient(); - conf.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0"); - conf.set(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, "127.0.0.1:0"); - cBlockManager = new CBlockManager(conf, storageClient); - cBlockManager.start(); - cBlockManager.createVolume(userName, volumeName1, volumeSize1, blockSize); - cBlockManager.createVolume(userName, volumeName2, volumeSize2, blockSize); - List<VolumeDescriptor> allVolumes = cBlockManager.getAllVolumes(); - // close the cblock server. Since meta data is written to disk on volume - // creation, closing server here is the same as a cblock server crash. - cBlockManager.close(); - cBlockManager.stop(); - cBlockManager.join(); - cBlockManager = null; - assertEquals(2, allVolumes.size()); - VolumeDescriptor volumeDescriptor1 = allVolumes.get(0); - VolumeDescriptor volumeDescriptor2 = allVolumes.get(1); - - // create a new cblock server instance. This is just the - // same as restarting cblock server. - ScmClient storageClient1 = new MockStorageClient(); - OzoneConfiguration conf1 = new OzoneConfiguration(); - conf1.set(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, path.concat( - "/testCblockPersistence.dat")); - conf1.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0"); - conf1.set(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, "127.0.0.1:0"); - cBlockManager1 = new CBlockManager(conf1, storageClient1); - cBlockManager1.start(); - List<VolumeDescriptor> allVolumes1 = cBlockManager1.getAllVolumes(); - assertEquals(2, allVolumes1.size()); - VolumeDescriptor newvolumeDescriptor1 = allVolumes1.get(0); - VolumeDescriptor newvolumeDescriptor2 = allVolumes1.get(1); - - // It seems levelDB iterator gets keys in the same order as keys - // are inserted, in which case the else clause should never happen. - // But still kept the second clause if it is possible to get different - // key ordering from leveldb. And we do not rely on the ordering of keys - // here. - if (volumeDescriptor1.getVolumeName().equals( - newvolumeDescriptor1.getVolumeName())) { - assertEquals(volumeDescriptor1.toString(), - newvolumeDescriptor1.toString()); - assertEquals(volumeDescriptor2.toString(), - newvolumeDescriptor2.toString()); - } else { - assertEquals(volumeDescriptor1.toString(), - newvolumeDescriptor2.toString()); - assertEquals(volumeDescriptor2.toString(), - newvolumeDescriptor1.toString()); - } - } finally { - if (cBlockManager != null) { - cBlockManager.clean(); - } - if (cBlockManager1 != null) { - cBlockManager1.close(); - cBlockManager1.stop(); - cBlockManager1.join(); - cBlockManager1.clean(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java deleted file mode 100644 index e1e2909..0000000 --- a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java +++ /dev/null @@ -1,444 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock; - -import com.google.common.primitives.Longs; -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.commons.lang.RandomStringUtils; -import org.apache.hadoop.cblock.jscsiHelper.CBlockIStorageImpl; -import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics; -import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher; -import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; -import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ozone.MiniOzoneClassicCluster; -import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.util.Time; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static java.lang.Math.abs; -import static org.apache.hadoop.cblock.CBlockConfigKeys. - DFS_CBLOCK_DISK_CACHE_PATH_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys. - DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO; -import static org.apache.hadoop.cblock.CBlockConfigKeys. - DFS_CBLOCK_TRACE_IO; - -/** - * Tests for local cache. - */ -public class TestLocalBlockCache { - private static final Logger LOG = - LoggerFactory.getLogger(TestLocalBlockCache.class); - private final static long GB = 1024 * 1024 * 1024; - private final static int KB = 1024; - private static MiniOzoneCluster cluster; - private static OzoneConfiguration config; - private static StorageContainerLocationProtocolClientSideTranslatorPB - storageContainerLocationClient; - private static XceiverClientManager xceiverClientManager; - - @BeforeClass - public static void init() throws IOException { - config = new OzoneConfiguration(); - String path = GenericTestUtils - .getTempPath(TestLocalBlockCache.class.getSimpleName()); - config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); - config.setBoolean(DFS_CBLOCK_TRACE_IO, true); - config.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - cluster = new MiniOzoneClassicCluster.Builder(config) - .numDataNodes(1) - .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build(); - storageContainerLocationClient = cluster - .createStorageContainerLocationClient(); - xceiverClientManager = new XceiverClientManager(config); - } - - @AfterClass - public static void shutdown() throws InterruptedException { - if (cluster != null) { - cluster.shutdown(); - } - IOUtils.cleanupWithLogger(null, storageContainerLocationClient, cluster); - } - - /** - * getContainerPipelines creates a set of containers and returns the - * Pipelines that define those containers. - * - * @param count - Number of containers to create. - * @return - List of Pipelines. - * @throws IOException throws Exception - */ - private List<Pipeline> getContainerPipeline(int count) throws IOException { - List<Pipeline> containerPipelines = new LinkedList<>(); - for (int x = 0; x < count; x++) { - String traceID = "trace" + RandomStringUtils.randomNumeric(4); - String containerName = "container" + RandomStringUtils.randomNumeric(10); - Pipeline pipeline = - storageContainerLocationClient.allocateContainer( - xceiverClientManager.getType(), - xceiverClientManager.getFactor(), containerName, "CBLOCK"); - XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline); - ContainerProtocolCalls.createContainer(client, traceID); - // This step is needed since we set private data on pipelines, when we - // read the list from CBlockServer. So we mimic that action here. - pipeline.setData(Longs.toByteArray(x)); - containerPipelines.add(pipeline); - xceiverClientManager.releaseClient(client); - } - return containerPipelines; - } - - /** - * This test creates a cache and performs a simple write / read. - * Due to the cache - we have Read-after-write consistency for cBlocks. - * - * @throws IOException throws Exception - */ - @Test - public void testCacheWriteRead() throws IOException, - InterruptedException, TimeoutException { - final long blockID = 0; - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - String data = RandomStringUtils.random(4 * KB); - String dataHash = DigestUtils.sha256Hex(data); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(this.config, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(this.config) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(getContainerPipeline(10)) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - cache.put(blockID, data.getBytes(StandardCharsets.UTF_8)); - Assert.assertEquals(1, metrics.getNumWriteOps()); - // Please note that this read is from the local cache. - LogicalBlock block = cache.get(blockID); - Assert.assertEquals(1, metrics.getNumReadOps()); - Assert.assertEquals(1, metrics.getNumReadCacheHits()); - Assert.assertEquals(0, metrics.getNumReadCacheMiss()); - Assert.assertEquals(0, metrics.getNumReadLostBlocks()); - - cache.put(blockID + 1, data.getBytes(StandardCharsets.UTF_8)); - Assert.assertEquals(2, metrics.getNumWriteOps()); - // Please note that this read is from the local cache. - block = cache.get(blockID + 1); - Assert.assertEquals(2, metrics.getNumReadOps()); - Assert.assertEquals(2, metrics.getNumReadCacheHits()); - Assert.assertEquals(0, metrics.getNumReadCacheMiss()); - Assert.assertEquals(0, metrics.getNumReadLostBlocks()); - String readHash = DigestUtils.sha256Hex(block.getData().array()); - Assert.assertEquals("File content does not match.", dataHash, readHash); - GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000); - cache.close(); - - } - - @Test - public void testCacheWriteToRemoteContainer() throws IOException, - InterruptedException, TimeoutException { - final long blockID = 0; - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - String data = RandomStringUtils.random(4 * KB); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(this.config, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(this.config) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(getContainerPipeline(10)) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - cache.put(blockID, data.getBytes(StandardCharsets.UTF_8)); - GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000); - cache.close(); - } - - @Test - public void testCacheWriteToRemote50KBlocks() throws IOException, - InterruptedException, TimeoutException { - final long totalBlocks = 50 * 1000; - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - String data = RandomStringUtils.random(4 * KB); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(this.config, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(this.config) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(getContainerPipeline(10)) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * 1024) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - long startTime = Time.monotonicNow(); - for (long blockid = 0; blockid < totalBlocks; blockid++) { - cache.put(blockid, data.getBytes(StandardCharsets.UTF_8)); - } - Assert.assertEquals(totalBlocks, metrics.getNumWriteOps()); - Assert.assertEquals(totalBlocks, metrics.getNumBlockBufferUpdates()); - LOG.info("Wrote 50K blocks, waiting for replication to finish."); - GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000); - long endTime = Time.monotonicNow(); - LOG.info("Time taken for writing {} blocks is {} seconds", totalBlocks, - TimeUnit.MILLISECONDS.toSeconds(endTime - startTime)); - // TODO: Read this data back. - cache.close(); - } - - @Test - public void testCacheInvalidBlock() throws IOException { - final int blockID = 1024; - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(this.config, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(this.config) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(getContainerPipeline(10)) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - // Read a non-existent block ID. - LogicalBlock block = cache.get(blockID); - Assert.assertNotNull(block); - Assert.assertEquals(4 * 1024, block.getData().array().length); - Assert.assertEquals(1, metrics.getNumReadOps()); - Assert.assertEquals(1, metrics.getNumReadLostBlocks()); - Assert.assertEquals(1, metrics.getNumReadCacheMiss()); - cache.close(); - } - - @Test - public void testReadWriteCorrectness() throws IOException, - InterruptedException, TimeoutException { - Random r = new Random(); - final int maxBlock = 12500000; - final int blockCount = 10 * 1000; - Map<Long, String> blockShaMap = new HashMap<>(); - List<Pipeline> pipelines = getContainerPipeline(10); - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(this.config, - xceiverClientManager, metrics); - final CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(this.config) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(pipelines) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - for (int x = 0; x < blockCount; x++) { - String data = RandomStringUtils.random(4 * 1024); - String dataHash = DigestUtils.sha256Hex(data); - long blockId = abs(r.nextInt(maxBlock)); - blockShaMap.put(blockId, dataHash); - cache.put(blockId, data.getBytes(StandardCharsets.UTF_8)); - } - Assert.assertEquals(blockCount, metrics.getNumWriteOps()); - GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000); - LOG.info("Finished with putting blocks ..starting reading blocks back. " + - "unique blocks : {}", blockShaMap.size()); - // Test reading from local cache. - for (Map.Entry<Long, String> entry : blockShaMap.entrySet()) { - LogicalBlock block = cache.get(entry.getKey()); - String blockSha = DigestUtils.sha256Hex(block.getData().array()); - Assert.assertEquals("Block data is not equal", entry.getValue(), - blockSha); - } - Assert.assertEquals(blockShaMap.size(), metrics.getNumReadOps()); - Assert.assertEquals(blockShaMap.size(), metrics.getNumReadCacheHits()); - Assert.assertEquals(0, metrics.getNumReadCacheMiss()); - Assert.assertEquals(0, metrics.getNumReadLostBlocks()); - - LOG.info("Finished with reading blocks, SUCCESS."); - // Close and discard local cache. - cache.close(); - LOG.info("Closing the and destroying local cache"); - CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher newflusher = new ContainerCacheFlusher(this.config, - xceiverClientManager, newMetrics); - Assert.assertEquals(0, newMetrics.getNumReadCacheHits()); - CBlockLocalCache newCache = null; - try { - newCache = CBlockLocalCache.newBuilder() - .setConfiguration(this.config) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(pipelines) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(newflusher) - .setCBlockTargetMetrics(newMetrics) - .build(); - newCache.start(); - for (Map.Entry<Long, String> entry : blockShaMap.entrySet()) { - LogicalBlock block = newCache.get(entry.getKey()); - String blockSha = DigestUtils.sha256Hex(block.getData().array()); - Assert.assertEquals("Block data is not equal", entry.getValue(), - blockSha); - } - - Assert.assertEquals(blockShaMap.size(), newMetrics.getNumReadOps()); - Assert.assertEquals(blockShaMap.size(), newMetrics.getNumReadCacheHits()); - Assert.assertEquals(0, newMetrics.getNumReadCacheMiss()); - Assert.assertEquals(0, newMetrics.getNumReadLostBlocks()); - - LOG.info("Finished with reading blocks from remote cache, SUCCESS."); - } finally { - if (newCache != null) { - newCache.close(); - } - } - } - - @Test - public void testStorageImplReadWrite() throws IOException, - InterruptedException, TimeoutException { - String userName = "user" + RandomStringUtils.randomNumeric(5); - String volumeName = "volume" + RandomStringUtils.randomNumeric(5); - long volumeSize = 50L * (1024L * 1024L * 1024L); - int blockSize = 4096; - byte[] data = - RandomStringUtils.randomAlphanumeric(10 * (1024 * 1024)) - .getBytes(StandardCharsets.UTF_8); - String hash = DigestUtils.sha256Hex(data); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(this.config, - xceiverClientManager, metrics); - CBlockIStorageImpl ozoneStore = CBlockIStorageImpl.newBuilder() - .setUserName(userName) - .setVolumeName(volumeName) - .setVolumeSize(volumeSize) - .setBlockSize(blockSize) - .setContainerList(getContainerPipeline(10)) - .setClientManager(xceiverClientManager) - .setConf(this.config) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - ozoneStore.write(data, 0); - - byte[] newData = new byte[10 * 1024 * 1024]; - ozoneStore.read(newData, 0); - String newHash = DigestUtils.sha256Hex(newData); - Assert.assertEquals("hashes don't match.", hash, newHash); - GenericTestUtils.waitFor(() -> !ozoneStore.getCache().isDirtyCache(), - 100, 20 * 1000); - ozoneStore.close(); - } - - //@Test - // Disabling this test for time being since the bug in JSCSI - // forces us always to have a local cache. - public void testStorageImplNoLocalCache() throws IOException, - InterruptedException, TimeoutException { - OzoneConfiguration oConfig = new OzoneConfiguration(); - oConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false); - oConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); - String userName = "user" + RandomStringUtils.randomNumeric(5); - String volumeName = "volume" + RandomStringUtils.randomNumeric(5); - long volumeSize = 50L * (1024L * 1024L * 1024L); - int blockSize = 4096; - byte[] data = - RandomStringUtils.randomAlphanumeric(10 * (1024 * 1024)) - .getBytes(StandardCharsets.UTF_8); - String hash = DigestUtils.sha256Hex(data); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(oConfig, - xceiverClientManager, metrics); - CBlockIStorageImpl ozoneStore = CBlockIStorageImpl.newBuilder() - .setUserName(userName) - .setVolumeName(volumeName) - .setVolumeSize(volumeSize) - .setBlockSize(blockSize) - .setContainerList(getContainerPipeline(10)) - .setClientManager(xceiverClientManager) - .setConf(oConfig) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - ozoneStore.write(data, 0); - - byte[] newData = new byte[10 * 1024 * 1024]; - ozoneStore.read(newData, 0); - String newHash = DigestUtils.sha256Hex(newData); - Assert.assertEquals("hashes don't match.", hash, newHash); - GenericTestUtils.waitFor(() -> !ozoneStore.getCache().isDirtyCache(), - 100, 20 * 1000); - ozoneStore.close(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/kubernetes/TestDynamicProvisioner.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/kubernetes/TestDynamicProvisioner.java b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/kubernetes/TestDynamicProvisioner.java deleted file mode 100644 index 0268ccc..0000000 --- a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/kubernetes/TestDynamicProvisioner.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.cblock.kubernetes; - -import io.kubernetes.client.JSON; -import io.kubernetes.client.models.V1PersistentVolume; -import io.kubernetes.client.models.V1PersistentVolumeClaim; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_ISCSI_ADVERTISED_IP; -import org.junit.Assert; -import org.junit.Test; - -import java.nio.file.Files; -import java.nio.file.Paths; - -import org.apache.hadoop.hdds.conf.OzoneConfiguration; - -/** - * Test the resource generation of Dynamic Provisioner. - */ -public class TestDynamicProvisioner { - - @Test - public void persitenceVolumeBuilder() throws Exception { - - OzoneConfiguration conf = new OzoneConfiguration(); - conf.setStrings(DFS_CBLOCK_ISCSI_ADVERTISED_IP, "1.2.3.4"); - - DynamicProvisioner provisioner = - new DynamicProvisioner(conf, null); - - String pvc = new String(Files.readAllBytes( - Paths.get(getClass().getResource( - "/dynamicprovisioner/input1-pvc.json").toURI()))); - - String pv = new String(Files.readAllBytes( - Paths.get(getClass().getResource( - "/dynamicprovisioner/expected1-pv.json").toURI()))); - - JSON json = new io.kubernetes.client.JSON(); - - V1PersistentVolumeClaim claim = - json.getGson().fromJson(pvc, V1PersistentVolumeClaim.class); - - String volumeName = provisioner.createVolumeName(claim); - - V1PersistentVolume volume = - provisioner.persitenceVolumeBuilder(claim, volumeName); - - //remove the data which should not been compared - V1PersistentVolume expectedVolume = - json.getGson().fromJson(pv, V1PersistentVolume.class); - - - Assert.assertEquals(expectedVolume, volume); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java deleted file mode 100644 index d7dabe3..0000000 --- a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.util; - -import org.apache.hadoop.cblock.meta.ContainerDescriptor; -import org.apache.hadoop.ozone.container.ContainerTestHelper; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; - -import java.io.IOException; -import java.util.concurrent.ConcurrentHashMap; - -/** - * NOTE : This class is only for testing purpose. - * - * Mock an underlying container storage layer, expose to CBlock to perform - * IO. While in this mock implementation, a container is nothing more than - * a in memory hashmap. - * - * This is to allow volume creation call and perform standalone tests. - */ -public final class ContainerLookUpService { - private static ConcurrentHashMap<String, ContainerDescriptor> - containers = new ConcurrentHashMap<>(); - - /** - * Return an *existing* container with given Id. - * - * TODO : for testing purpose, return a new container if the given Id - * is not found - * - * found - * @param containerID - * @return the corresponding pipeline instance (create if not exist) - */ - public static ContainerDescriptor lookUp(String containerID) - throws IOException { - if (!containers.containsKey(containerID)) { - Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline( - containerID); - ContainerDescriptor cd = new ContainerDescriptor(containerID); - cd.setPipeline(pipeline); - containers.put(containerID, cd); - } - return containers.get(containerID); - } - - public static void addContainer(String containerID) throws IOException { - Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline( - containerID); - ContainerDescriptor cd = new ContainerDescriptor(containerID); - cd.setPipeline(pipeline); - containers.put(containerID, cd); - } - - private ContainerLookUpService() { - - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java deleted file mode 100644 index 9fa76a8..0000000 --- a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.util; - -import org.apache.hadoop.cblock.meta.ContainerDescriptor; -import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.ContainerData; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.client.ScmClient; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * This class is the one that directly talks to SCM server. - * - * NOTE : this is only a mock class, only to allow testing volume - * creation without actually creating containers. In real world, need to be - * replaced with actual container look up calls. - * - */ -public class MockStorageClient implements ScmClient { - private static AtomicInteger currentContainerId = - new AtomicInteger(0); - - /** - * Ask SCM to get a exclusive container. - * - * @return A container descriptor object to locate this container - * @throws Exception - */ - @Override - public Pipeline createContainer(String containerId, String owner) - throws IOException { - int contId = currentContainerId.getAndIncrement(); - ContainerLookUpService.addContainer(Long.toString(contId)); - return ContainerLookUpService.lookUp(Long.toString(contId)) - .getPipeline(); - } - - /** - * As this is only a testing class, with all "container" maintained in - * memory, no need to really delete anything for now. - * @throws IOException - */ - @Override - public void deleteContainer(Pipeline pipeline, boolean force) - throws IOException { - - } - - /** - * This is a mock class, so returns the container infos of start container - * and end container. - * - * @param startName start container name. - * @param prefixName prefix container name. - * @param count count. - * @return a list of pipeline. - * @throws IOException - */ - @Override - public List<ContainerInfo> listContainer(String startName, - String prefixName, int count) throws IOException { - List<ContainerInfo> containerList = new ArrayList<>(); - ContainerDescriptor containerDescriptor = - ContainerLookUpService.lookUp(startName); - ContainerInfo container = new ContainerInfo.Builder() - .setContainerName(containerDescriptor.getContainerID()) - .setPipeline(containerDescriptor.getPipeline()) - .setState(HddsProtos.LifeCycleState.ALLOCATED) - .build(); - containerList.add(container); - return containerList; - } - - /** - * Create a instance of ContainerData by a given container id, - * since this is a testing class, there is no need set up the hold - * env to get the meta data of the container. - * @param pipeline - * @return - * @throws IOException - */ - @Override - public ContainerData readContainer(Pipeline pipeline) throws IOException { - return ContainerData.newBuilder() - .setName(pipeline.getContainerName()) - .build(); - } - - /** - * Return reference to an *existing* container with given ID. - * - * @param containerId - * @return - * @throws IOException - */ - public Pipeline getContainer(String containerId) - throws IOException { - return ContainerLookUpService.lookUp(containerId).getPipeline(); - } - - @Override - public void closeContainer(Pipeline container) throws IOException { - // Do nothing, because the mock container does not have the notion of - // "open" and "close". - } - - @Override - public long getContainerSize(Pipeline pipeline) throws IOException { - // just return a constant value for now - return 5L * OzoneConsts.GB; // 5GB - } - - @Override - public Pipeline createContainer(HddsProtos.ReplicationType type, - HddsProtos.ReplicationFactor replicationFactor, String containerId, - String owner) throws IOException { - int contId = currentContainerId.getAndIncrement(); - ContainerLookUpService.addContainer(Long.toString(contId)); - return ContainerLookUpService.lookUp(Long.toString(contId)) - .getPipeline(); - } - - /** - * Returns a set of Nodes that meet a query criteria. - * - * @param nodeStatuses - A set of criteria that we want the node to have. - * @param queryScope - Query scope - Cluster or pool. - * @param poolName - if it is pool, a pool name is required. - * @return A set of nodes that meet the requested criteria. - * @throws IOException - */ - @Override - public HddsProtos.NodePool queryNode(EnumSet<HddsProtos.NodeState> - nodeStatuses, HddsProtos.QueryScope queryScope, String poolName) - throws IOException { - return null; - } - - /** - * Creates a specified replication pipeline. - * - * @param type - Type - * @param factor - Replication factor - * @param nodePool - Set of machines. - * @throws IOException - */ - @Override - public Pipeline createReplicationPipeline(HddsProtos.ReplicationType type, - HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool) - throws IOException { - return null; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/test/resources/dynamicprovisioner/expected1-pv.json ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/test/resources/dynamicprovisioner/expected1-pv.json b/hadoop-cblock/server/src/test/resources/dynamicprovisioner/expected1-pv.json deleted file mode 100644 index 569de61..0000000 --- a/hadoop-cblock/server/src/test/resources/dynamicprovisioner/expected1-pv.json +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -{ - "apiVersion": "v1", - "kind": "PersistentVolume", - "metadata": { - "annotations": { - "volume.beta.kubernetes.io/storage-class": "cblock", - "pv.kubernetes.io/provisioned-by": "hadoop.apache.org/cblock" - }, - "name": "volume1-b65d053d-f92e-11e7-be3b-84b261c34638", - "namespace": "ns" - }, - "spec": { - "accessModes": [ - "ReadWriteOnce" - ], - "capacity": { - "storage": "1Gi" - }, - "claimRef": { - "apiVersion": "v1", - "kind": "PersistentVolumeClaim", - "name": "volume1", - "namespace": "ns", - "uid": "b65d053d-f92e-11e7-be3b-84b261c34638" - }, - "iscsi": { - "fsType": "ext4", - "iqn": "iqn.2001-04.org.apache.hadoop:volume1-b65d053d-f92e-11e7-be3b-84b261c34638", - "lun": 0, - "portals": [ - "1.2.3.4:3260" - ], - "targetPortal": "1.2.3.4:3260" - }, - "persistentVolumeReclaimPolicy": "Delete" - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/test/resources/dynamicprovisioner/input1-pvc.json ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/test/resources/dynamicprovisioner/input1-pvc.json b/hadoop-cblock/server/src/test/resources/dynamicprovisioner/input1-pvc.json deleted file mode 100644 index 02815be..0000000 --- a/hadoop-cblock/server/src/test/resources/dynamicprovisioner/input1-pvc.json +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -{ - "apiVersion": "v1", - "kind": "PersistentVolumeClaim", - "metadata": { - "annotations": { - "pv.kubernetes.io/bind-completed": "yes", - "pv.kubernetes.io/bound-by-controller": "yes", - "volume.beta.kubernetes.io/storage-provisioner": "hadoop.apache.org/cblock" - }, - "creationTimestamp": "2018-01-14T13:27:48Z", - "name": "volume1", - "namespace": "ns", - "resourceVersion": "5532691", - "selfLink": "/api/v1/namespaces/demo1/persistentvolumeclaims/persistent", - "uid": "b65d053d-f92e-11e7-be3b-84b261c34638" - }, - "spec": { - "accessModes": [ - "ReadWriteOnce" - ], - "resources": { - "requests": { - "storage": "1Gi" - } - }, - "storageClassName": "cblock", - "volumeName": "persistent-b65d053d-f92e-11e7-be3b-84b261c34638" - }, - "status": { - "accessModes": [ - "ReadWriteOnce" - ], - "capacity": { - "storage": "1Gi" - }, - "phase": "Bound" - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/tools/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-cblock/tools/pom.xml b/hadoop-cblock/tools/pom.xml deleted file mode 100644 index 150f2e5..0000000 --- a/hadoop-cblock/tools/pom.xml +++ /dev/null @@ -1,42 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. See accompanying LICENSE file. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 -http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-cblock</artifactId> - <version>3.2.0-SNAPSHOT</version> - </parent> - <artifactId>hadoop-cblock-tools</artifactId> - <version>3.2.0-SNAPSHOT</version> - <description>Apache Hadoop CBlock Tools</description> - <name>Apache Hadoop CBlock Tools</name> - <packaging>jar</packaging> - - <properties> - <hadoop.component>cblock</hadoop.component> - <is.hadoop.component>true</is.hadoop.component> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-cblock-server</artifactId> - </dependency> - </dependencies> -</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/tools/src/main/java/org/apache/hadoop/cblock/cli/CBlockCli.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/tools/src/main/java/org/apache/hadoop/cblock/cli/CBlockCli.java b/hadoop-cblock/tools/src/main/java/org/apache/hadoop/cblock/cli/CBlockCli.java deleted file mode 100644 index c6c0e84..0000000 --- a/hadoop-cblock/tools/src/main/java/org/apache/hadoop/cblock/cli/CBlockCli.java +++ /dev/null @@ -1,265 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.cli; - -import org.apache.commons.cli.BasicParser; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; - -import org.apache.hadoop.cblock.CblockUtils; -import org.apache.hadoop.cblock.client.CBlockVolumeClient; -import org.apache.hadoop.cblock.meta.VolumeInfo; -import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.PrintStream; -import java.net.InetSocketAddress; -import java.util.Arrays; -import java.util.List; - -/** - * The command line tool class. - */ -public class CBlockCli extends Configured implements Tool { - - private static final String CREATE_VOLUME = "createVolume"; - - private static final String DELETE_VOLUME = "deleteVolume"; - - private static final String INFO_VOLUME = "infoVolume"; - - private static final String LIST_VOLUME = "listVolume"; - - private static final String SERVER_ADDR = "serverAddr"; - - private static final String HELP = "help"; - - private static final Logger LOG = - LoggerFactory.getLogger(CBlockCli.class); - private OzoneConfiguration conf; - - private PrintStream printStream; - - private Options options; - - private BasicParser parser; - - private CBlockVolumeClient localProxy; - - public CBlockCli(OzoneConfiguration conf, PrintStream printStream) - throws IOException { - this.printStream = printStream; - this.conf = conf; - this.options = getOptions(); - this.parser = new BasicParser(); - } - - public CBlockCli(OzoneConfiguration conf) throws IOException{ - this(conf, System.out); - } - - private CommandLine parseArgs(String[] argv) - throws ParseException { - return parser.parse(options, argv); - } - - private static Options getOptions() { - Options options = new Options(); - Option serverAddress = OptionBuilder - .withArgName("serverAddress>:<serverPort") - .withLongOpt(SERVER_ADDR) - .withValueSeparator(':') - .hasArgs(2) - .withDescription("specify server address:port") - .create("s"); - options.addOption(serverAddress); - - // taking 4 args: userName, volumeName, volumeSize, blockSize - Option createVolume = OptionBuilder - .withArgName("user> <volume> <volumeSize in [GB/TB]> <blockSize") - .withLongOpt(CREATE_VOLUME) - .withValueSeparator(' ') - .hasArgs(4) - .withDescription("create a fresh new volume") - .create("c"); - options.addOption(createVolume); - - // taking 2 args: userName, volumeName - Option deleteVolume = OptionBuilder - .withArgName("user> <volume") - .withLongOpt(DELETE_VOLUME) - .hasArgs(2) - .withDescription("delete a volume") - .create("d"); - options.addOption(deleteVolume); - - // taking 2 args: userName, volumeName - Option infoVolume = OptionBuilder - .withArgName("user> <volume") - .withLongOpt(INFO_VOLUME) - .hasArgs(2) - .withDescription("info a volume") - .create("i"); - options.addOption(infoVolume); - - // taking 1 arg: userName - Option listVolume = OptionBuilder - .withArgName("user") - .withLongOpt(LIST_VOLUME) - .hasOptionalArgs(1) - .withDescription("list all volumes") - .create("l"); - options.addOption(listVolume); - - Option help = OptionBuilder - .withLongOpt(HELP) - .withDescription("help") - .create("h"); - options.addOption(help); - - return options; - } - - @Override - public int run(String[] args) throws ParseException, IOException { - CommandLine commandLine = parseArgs(args); - if (commandLine.hasOption("s")) { - String[] serverAddrArgs = commandLine.getOptionValues("s"); - LOG.info("server address" + Arrays.toString(serverAddrArgs)); - String serverHost = serverAddrArgs[0]; - int serverPort = Integer.parseInt(serverAddrArgs[1]); - InetSocketAddress serverAddress = - new InetSocketAddress(serverHost, serverPort); - this.localProxy = new CBlockVolumeClient(conf, serverAddress); - } else { - this.localProxy = new CBlockVolumeClient(conf); - } - - if (commandLine.hasOption("h")) { - LOG.info("help"); - help(); - } - - if (commandLine.hasOption("c")) { - String[] createArgs = commandLine.getOptionValues("c"); - LOG.info("create volume:" + Arrays.toString(createArgs)); - createVolume(createArgs); - } - - if (commandLine.hasOption("d")) { - String[] deleteArgs = commandLine.getOptionValues("d"); - LOG.info("delete args:" + Arrays.toString(deleteArgs)); - deleteVolume(deleteArgs); - } - - if (commandLine.hasOption("l")) { - String[] listArg = commandLine.getOptionValues("l"); - LOG.info("list args:" + Arrays.toString(listArg)); - listVolume(listArg); - } - - if (commandLine.hasOption("i")) { - String[] infoArgs = commandLine.getOptionValues("i"); - LOG.info("info args:" + Arrays.toString(infoArgs)); - infoVolume(infoArgs); - } - return 0; - } - - public static void main(String[] argv) throws Exception { - CblockUtils.activateConfigs(); - OzoneConfiguration cblockConf = new OzoneConfiguration(); - RPC.setProtocolEngine(cblockConf, CBlockServiceProtocolPB.class, - ProtobufRpcEngine.class); - int res = 0; - Tool shell = new CBlockCli(cblockConf, System.out); - try { - ToolRunner.run(shell, argv); - } catch (Exception ex) { - LOG.error(ex.toString()); - res = 1; - } - System.exit(res); - } - - - - private void createVolume(String[] createArgs) throws IOException { - String userName = createArgs[0]; - String volumeName = createArgs[1]; - long volumeSize = CblockUtils.parseSize(createArgs[2]); - int blockSize = Integer.parseInt(createArgs[3])*1024; - localProxy.createVolume(userName, volumeName, volumeSize, blockSize); - } - - private void deleteVolume(String[] deleteArgs) throws IOException { - String userName = deleteArgs[0]; - String volumeName = deleteArgs[1]; - boolean force = false; - if (deleteArgs.length > 2) { - force = Boolean.parseBoolean(deleteArgs[2]); - } - localProxy.deleteVolume(userName, volumeName, force); - } - - private void infoVolume(String[] infoArgs) throws IOException { - String userName = infoArgs[0]; - String volumeName = infoArgs[1]; - VolumeInfo volumeInfo = localProxy.infoVolume(userName, volumeName); - printStream.println(volumeInfo.toString()); - } - - private void listVolume(String[] listArgs) throws IOException { - StringBuilder stringBuilder = new StringBuilder(); - List<VolumeInfo> volumeResponse; - if (listArgs == null) { - volumeResponse = localProxy.listVolume(null); - } else { - volumeResponse = localProxy.listVolume(listArgs[0]); - } - for (int i = 0; i<volumeResponse.size(); i++) { - stringBuilder.append( - String.format("%s:%s\t%d\t%d", volumeResponse.get(i).getUserName(), - volumeResponse.get(i).getVolumeName(), - volumeResponse.get(i).getVolumeSize(), - volumeResponse.get(i).getBlockSize())); - if (i < volumeResponse.size() - 1) { - stringBuilder.append("\n"); - } - } - printStream.println(stringBuilder); - } - - private void help() { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(100, "cblock", "", options, ""); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/tools/src/main/java/org/apache/hadoop/cblock/cli/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/tools/src/main/java/org/apache/hadoop/cblock/cli/package-info.java b/hadoop-cblock/tools/src/main/java/org/apache/hadoop/cblock/cli/package-info.java deleted file mode 100644 index b8b1889..0000000 --- a/hadoop-cblock/tools/src/main/java/org/apache/hadoop/cblock/cli/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.cli; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
