HDDS-1. Remove SCM Block DB. Contributed by Xiaoyu Yao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3a43ac28 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3a43ac28 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3a43ac28 Branch: refs/heads/trunk Commit: 3a43ac2851f5dea4deb8a1dfebf9bf65fc57bd76 Parents: a3a1552 Author: Anu Engineer <aengin...@apache.org> Authored: Mon May 7 14:42:18 2018 -0700 Committer: Anu Engineer <aengin...@apache.org> Committed: Mon May 7 14:58:52 2018 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/hadoop/util/Time.java | 12 + .../hadoop/hdds/scm/XceiverClientManager.java | 17 +- .../scm/client/ContainerOperationClient.java | 121 +++--- .../hdds/scm/storage/ChunkInputStream.java | 13 +- .../hdds/scm/storage/ChunkOutputStream.java | 18 +- .../org/apache/hadoop/hdds/client/BlockID.java | 59 +++ .../hadoop/hdds/scm/client/ScmClient.java | 46 +- .../common/helpers/AllocatedBlock.java | 20 +- .../container/common/helpers/ContainerInfo.java | 22 +- .../common/helpers/DeleteBlockResult.java | 16 +- .../scm/container/common/helpers/Pipeline.java | 18 +- .../common/helpers/PipelineChannel.java | 2 + .../scm/protocol/ScmBlockLocationProtocol.java | 12 - .../StorageContainerLocationProtocol.java | 26 +- ...kLocationProtocolClientSideTranslatorPB.java | 46 +- ...rLocationProtocolClientSideTranslatorPB.java | 60 ++- .../scm/storage/ContainerProtocolCalls.java | 66 ++- .../apache/hadoop/ozone/common/BlockGroup.java | 26 +- .../ozone/common/DeleteBlockGroupResult.java | 13 +- .../ozone/container/common/helpers/KeyData.java | 41 +- ...kLocationProtocolServerSideTranslatorPB.java | 38 +- ...rLocationProtocolServerSideTranslatorPB.java | 30 +- .../org/apache/hadoop/utils/LevelDBStore.java | 4 +- .../apache/hadoop/utils/MetadataKeyFilters.java | 20 +- .../org/apache/hadoop/utils/RocksDBStore.java | 1 + .../main/proto/DatanodeContainerProtocol.proto | 82 ++-- .../main/proto/ScmBlockLocationProtocol.proto | 36 +- .../StorageContainerLocationProtocol.proto | 16 +- hadoop-hdds/common/src/main/proto/hdds.proto | 13 +- .../container/common/helpers/ChunkUtils.java | 20 +- .../container/common/helpers/ContainerData.java | 33 +- .../common/helpers/ContainerReport.java | 19 +- .../common/helpers/ContainerUtils.java | 14 +- .../helpers/DeletedContainerBlocksSummary.java | 19 +- .../container/common/helpers/FileUtils.java | 3 +- .../container/common/helpers/KeyUtils.java | 6 +- .../container/common/impl/ChunkManagerImpl.java | 76 ++-- .../common/impl/ContainerManagerImpl.java | 210 +++++----- .../ozone/container/common/impl/Dispatcher.java | 121 +++--- .../container/common/impl/KeyManagerImpl.java | 66 ++- .../RandomContainerDeletionChoosingPolicy.java | 4 +- ...NOrderedContainerDeletionChoosingPolicy.java | 4 +- .../common/interfaces/ChunkManager.java | 19 +- .../ContainerDeletionChoosingPolicy.java | 2 +- .../common/interfaces/ContainerManager.java | 76 ++-- .../container/common/interfaces/KeyManager.java | 17 +- .../background/BlockDeletingService.java | 8 +- .../commandhandler/CloseContainerHandler.java | 9 +- .../DeleteBlocksCommandHandler.java | 9 +- .../states/endpoint/HeartbeatEndpointTask.java | 2 +- .../server/ratis/ContainerStateMachine.java | 21 +- .../container/common/utils/ContainerCache.java | 36 +- .../commands/CloseContainerCommand.java | 14 +- .../StorageContainerDatanodeProtocol.proto | 9 +- .../ozone/container/common/ScmTestMock.java | 2 +- .../hadoop/hdds/scm/block/BlockManager.java | 12 +- .../hadoop/hdds/scm/block/BlockManagerImpl.java | 171 ++------ .../block/DatanodeDeletedBlockTransactions.java | 6 +- .../hadoop/hdds/scm/block/DeletedBlockLog.java | 6 +- .../hdds/scm/block/DeletedBlockLogImpl.java | 24 +- .../hdds/scm/container/ContainerMapping.java | 80 ++-- .../scm/container/ContainerStateManager.java | 12 +- .../hadoop/hdds/scm/container/Mapping.java | 33 +- .../scm/container/closer/ContainerCloser.java | 16 +- .../container/replication/InProgressPool.java | 10 +- .../hdds/scm/node/SCMNodePoolManager.java | 2 +- .../hdds/scm/pipelines/PipelineManager.java | 15 +- .../hdds/scm/pipelines/PipelineSelector.java | 8 +- .../scm/pipelines/ratis/RatisManagerImpl.java | 2 +- .../hdds/scm/server/SCMBlockProtocolServer.java | 20 +- .../scm/server/SCMClientProtocolServer.java | 47 ++- .../scm/server/StorageContainerManager.java | 9 +- .../hadoop/hdds/scm/block/TestBlockManager.java | 34 +- .../hdds/scm/block/TestDeletedBlockLog.java | 56 +-- .../scm/container/TestContainerMapping.java | 79 ++-- .../container/closer/TestContainerCloser.java | 25 +- .../hdds/scm/node/TestContainerPlacement.java | 9 +- .../ozone/container/common/TestEndPoint.java | 7 +- .../replication/TestContainerSupervisor.java | 32 +- .../ReplicationDatanodeStateManager.java | 11 +- .../cli/container/CloseContainerHandler.java | 26 +- .../cli/container/ContainerCommandHandler.java | 1 - .../cli/container/CreateContainerHandler.java | 22 +- .../cli/container/DeleteContainerHandler.java | 26 +- .../scm/cli/container/InfoContainerHandler.java | 31 +- .../scm/cli/container/ListContainerHandler.java | 12 +- .../ozone/client/io/ChunkGroupInputStream.java | 25 +- .../ozone/client/io/ChunkGroupOutputStream.java | 35 +- .../client/io/OzoneContainerTranslation.java | 11 +- .../ozone/ksm/helpers/KsmKeyLocationInfo.java | 41 +- .../ksm/helpers/KsmKeyLocationInfoGroup.java | 2 +- .../hadoop/ozone/web/handlers/UserArgs.java | 8 +- .../main/proto/KeySpaceManagerProtocol.proto | 11 +- .../container/TestContainerStateManager.java | 124 +++--- .../hadoop/ozone/TestContainerOperations.java | 10 +- .../hadoop/ozone/TestMiniOzoneCluster.java | 3 +- .../ozone/TestStorageContainerManager.java | 64 +-- .../TestStorageContainerManagerHelper.java | 33 +- .../ozone/client/rpc/TestOzoneRpcClient.java | 10 +- .../ozone/container/ContainerTestHelper.java | 180 ++++---- .../common/TestBlockDeletingService.java | 25 +- .../TestContainerDeletionChoosingPolicy.java | 19 +- .../common/impl/TestContainerPersistence.java | 419 +++++++++---------- .../TestCloseContainerHandler.java | 14 +- .../container/metrics/TestContainerMetrics.java | 14 +- .../container/ozoneimpl/TestOzoneContainer.java | 142 +++---- .../ozoneimpl/TestOzoneContainerRatis.java | 3 +- .../container/server/TestContainerServer.java | 18 +- .../ozone/ksm/TestContainerReportWithKeys.java | 8 +- .../hadoop/ozone/ksm/TestKeySpaceManager.java | 8 - .../ozone/ksm/TestKsmBlockVersioning.java | 2 +- .../hadoop/ozone/scm/TestAllocateContainer.java | 24 +- .../hadoop/ozone/scm/TestContainerSQLCli.java | 40 +- .../ozone/scm/TestContainerSmallFile.java | 68 +-- .../org/apache/hadoop/ozone/scm/TestSCMCli.java | 210 ++++------ .../apache/hadoop/ozone/scm/TestSCMMetrics.java | 5 +- .../ozone/scm/TestXceiverClientManager.java | 86 ++-- .../ozone/scm/TestXceiverClientMetrics.java | 19 +- .../hadoop/ozone/web/client/TestKeys.java | 5 +- .../ozone/ksm/KSMMetadataManagerImpl.java | 10 +- .../hadoop/ozone/ksm/KeyDeletingService.java | 3 +- .../apache/hadoop/ozone/ksm/KeyManagerImpl.java | 6 +- .../hadoop/ozone/ksm/KeySpaceManager.java | 2 +- .../hadoop/ozone/ksm/OpenKeyCleanupService.java | 3 +- .../genesis/BenchMarkContainerStateMap.java | 10 +- .../genesis/BenchMarkDatanodeDispatcher.java | 170 +++++--- .../ozone/genesis/BenchMarkRocksDbStore.java | 14 +- 127 files changed, 2059 insertions(+), 2402 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Time.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Time.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Time.java index db5a567..42005f0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Time.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Time.java @@ -18,6 +18,8 @@ package org.apache.hadoop.util; import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.TimeZone; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -34,6 +36,8 @@ public final class Time { */ private static final long NANOSECONDS_PER_MILLISECOND = 1000000; + private static final TimeZone UTC_ZONE = TimeZone.getTimeZone("UTC"); + private static final ThreadLocal<SimpleDateFormat> DATE_FORMAT = new ThreadLocal<SimpleDateFormat>() { @Override @@ -82,4 +86,12 @@ public final class Time { public static String formatTime(long millis) { return DATE_FORMAT.get().format(millis); } + + /** + * Get the current UTC time in milliseconds. + * @return the current UTC time in milliseconds. + */ + public static long getUtcTime() { + return Calendar.getInstance(UTC_ZONE).getTimeInMillis(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index 7585104..dcaa576 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -60,7 +60,7 @@ public class XceiverClientManager implements Closeable { //TODO : change this to SCM configuration class private final Configuration conf; - private final Cache<String, XceiverClientSpi> clientCache; + private final Cache<Long, XceiverClientSpi> clientCache; private final boolean useRatis; private static XceiverClientMetrics metrics; @@ -84,10 +84,10 @@ public class XceiverClientManager implements Closeable { .expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS) .maximumSize(maxSize) .removalListener( - new RemovalListener<String, XceiverClientSpi>() { + new RemovalListener<Long, XceiverClientSpi>() { @Override public void onRemoval( - RemovalNotification<String, XceiverClientSpi> + RemovalNotification<Long, XceiverClientSpi> removalNotification) { synchronized (clientCache) { // Mark the entry as evicted @@ -99,7 +99,7 @@ public class XceiverClientManager implements Closeable { } @VisibleForTesting - public Cache<String, XceiverClientSpi> getClientCache() { + public Cache<Long, XceiverClientSpi> getClientCache() { return clientCache; } @@ -114,14 +114,14 @@ public class XceiverClientManager implements Closeable { * @return XceiverClientSpi connected to a container * @throws IOException if a XceiverClientSpi cannot be acquired */ - public XceiverClientSpi acquireClient(Pipeline pipeline) + public XceiverClientSpi acquireClient(Pipeline pipeline, long containerID) throws IOException { Preconditions.checkNotNull(pipeline); Preconditions.checkArgument(pipeline.getMachines() != null); Preconditions.checkArgument(!pipeline.getMachines().isEmpty()); synchronized (clientCache) { - XceiverClientSpi info = getClient(pipeline); + XceiverClientSpi info = getClient(pipeline, containerID); info.incrementReference(); return info; } @@ -139,11 +139,10 @@ public class XceiverClientManager implements Closeable { } } - private XceiverClientSpi getClient(Pipeline pipeline) + private XceiverClientSpi getClient(Pipeline pipeline, long containerID) throws IOException { - String containerName = pipeline.getContainerName(); try { - return clientCache.get(containerName, + return clientCache.get(containerID, new Callable<XceiverClientSpi>() { @Override public XceiverClientSpi call() throws Exception { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java index 8f30a7f..15d197c 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java @@ -86,15 +86,16 @@ public class ContainerOperationClient implements ScmClient { * @inheritDoc */ @Override - public Pipeline createContainer(String containerId, String owner) + public ContainerInfo createContainer(String owner) throws IOException { XceiverClientSpi client = null; try { - Pipeline pipeline = + ContainerInfo container = storageContainerLocationClient.allocateContainer( xceiverClientManager.getType(), - xceiverClientManager.getFactor(), containerId, owner); - client = xceiverClientManager.acquireClient(pipeline); + xceiverClientManager.getFactor(), owner); + Pipeline pipeline = container.getPipeline(); + client = xceiverClientManager.acquireClient(pipeline, container.getContainerID()); // Allocated State means that SCM has allocated this pipeline in its // namespace. The client needs to create the pipeline on the machines @@ -104,10 +105,8 @@ public class ContainerOperationClient implements ScmClient { if (pipeline.getLifeCycleState() == ALLOCATED) { createPipeline(client, pipeline); } - // TODO : Container Client State needs to be updated. - // TODO : Return ContainerInfo instead of Pipeline - createContainer(containerId, client, pipeline); - return pipeline; + createContainer(client, container.getContainerID()); + return container; } finally { if (client != null) { xceiverClientManager.releaseClient(client); @@ -118,20 +117,19 @@ public class ContainerOperationClient implements ScmClient { /** * Create a container over pipeline specified by the SCM. * - * @param containerId - Container ID - * @param client - Client to communicate with Datanodes - * @param pipeline - A pipeline that is already created. + * @param client - Client to communicate with Datanodes. + * @param containerId - Container ID. * @throws IOException */ - public void createContainer(String containerId, XceiverClientSpi client, - Pipeline pipeline) throws IOException { + public void createContainer(XceiverClientSpi client, + long containerId) throws IOException { String traceID = UUID.randomUUID().toString(); storageContainerLocationClient.notifyObjectStageChange( ObjectStageChangeRequestProto.Type.container, containerId, ObjectStageChangeRequestProto.Op.create, ObjectStageChangeRequestProto.Stage.begin); - ContainerProtocolCalls.createContainer(client, traceID); + ContainerProtocolCalls.createContainer(client, containerId, traceID); storageContainerLocationClient.notifyObjectStageChange( ObjectStageChangeRequestProto.Type.container, containerId, @@ -142,8 +140,8 @@ public class ContainerOperationClient implements ScmClient { // creation state. if (LOG.isDebugEnabled()) { LOG.debug("Created container " + containerId - + " leader:" + pipeline.getLeader() - + " machines:" + pipeline.getMachines()); + + " leader:" + client.getPipeline().getLeader() + + " machines:" + client.getPipeline().getMachines()); } } @@ -168,20 +166,25 @@ public class ContainerOperationClient implements ScmClient { // 2. Talk to Datanodes to create the pipeline. // // 3. update SCM that pipeline creation was successful. - storageContainerLocationClient.notifyObjectStageChange( - ObjectStageChangeRequestProto.Type.pipeline, - pipeline.getPipelineName(), - ObjectStageChangeRequestProto.Op.create, - ObjectStageChangeRequestProto.Stage.begin); + + // TODO: this has not been fully implemented on server side + // SCMClientProtocolServer#notifyObjectStageChange + // TODO: when implement the pipeline state machine, change + // the pipeline name (string) to pipeline id (long) + //storageContainerLocationClient.notifyObjectStageChange( + // ObjectStageChangeRequestProto.Type.pipeline, + // pipeline.getPipelineName(), + // ObjectStageChangeRequestProto.Op.create, + // ObjectStageChangeRequestProto.Stage.begin); client.createPipeline(pipeline.getPipelineName(), pipeline.getMachines()); - storageContainerLocationClient.notifyObjectStageChange( - ObjectStageChangeRequestProto.Type.pipeline, - pipeline.getPipelineName(), - ObjectStageChangeRequestProto.Op.create, - ObjectStageChangeRequestProto.Stage.complete); + //storageContainerLocationClient.notifyObjectStageChange( + // ObjectStageChangeRequestProto.Type.pipeline, + // pipeline.getPipelineName(), + // ObjectStageChangeRequestProto.Op.create, + // ObjectStageChangeRequestProto.Stage.complete); // TODO : Should we change the state on the client side ?? // That makes sense, but it is not needed for the client to work. @@ -193,16 +196,17 @@ public class ContainerOperationClient implements ScmClient { * @inheritDoc */ @Override - public Pipeline createContainer(HddsProtos.ReplicationType type, - HddsProtos.ReplicationFactor factor, - String containerId, String owner) throws IOException { + public ContainerInfo createContainer(HddsProtos.ReplicationType type, + HddsProtos.ReplicationFactor factor, String owner) throws IOException { XceiverClientSpi client = null; try { // allocate container on SCM. - Pipeline pipeline = + ContainerInfo container = storageContainerLocationClient.allocateContainer(type, factor, - containerId, owner); - client = xceiverClientManager.acquireClient(pipeline); + owner); + Pipeline pipeline = container.getPipeline(); + client = xceiverClientManager.acquireClient(pipeline, + container.getContainerID()); // Allocated State means that SCM has allocated this pipeline in its // namespace. The client needs to create the pipeline on the machines @@ -210,12 +214,11 @@ public class ContainerOperationClient implements ScmClient { if (pipeline.getLifeCycleState() == ALLOCATED) { createPipeline(client, pipeline); } - - // TODO : Return ContainerInfo instead of Pipeline // connect to pipeline leader and allocate container on leader datanode. - client = xceiverClientManager.acquireClient(pipeline); - createContainer(containerId, client, pipeline); - return pipeline; + client = xceiverClientManager.acquireClient(pipeline, + container.getContainerID()); + createContainer(client, container.getContainerID()); + return container; } finally { if (client != null) { xceiverClientManager.releaseClient(client); @@ -258,18 +261,18 @@ public class ContainerOperationClient implements ScmClient { * @throws IOException */ @Override - public void deleteContainer(Pipeline pipeline, boolean force) + public void deleteContainer(long containerID, Pipeline pipeline, boolean force) throws IOException { XceiverClientSpi client = null; try { - client = xceiverClientManager.acquireClient(pipeline); + client = xceiverClientManager.acquireClient(pipeline, containerID); String traceID = UUID.randomUUID().toString(); - ContainerProtocolCalls.deleteContainer(client, force, traceID); + ContainerProtocolCalls.deleteContainer(client, containerID, force, traceID); storageContainerLocationClient - .deleteContainer(pipeline.getContainerName()); + .deleteContainer(containerID); if (LOG.isDebugEnabled()) { LOG.debug("Deleted container {}, leader: {}, machines: {} ", - pipeline.getContainerName(), + containerID, pipeline.getLeader(), pipeline.getMachines()); } @@ -284,11 +287,10 @@ public class ContainerOperationClient implements ScmClient { * {@inheritDoc} */ @Override - public List<ContainerInfo> listContainer(String startName, - String prefixName, int count) - throws IOException { + public List<ContainerInfo> listContainer(long startContainerID, + int count) throws IOException { return storageContainerLocationClient.listContainer( - startName, prefixName, count); + startContainerID, count); } /** @@ -300,17 +302,17 @@ public class ContainerOperationClient implements ScmClient { * @throws IOException */ @Override - public ContainerData readContainer(Pipeline pipeline) throws IOException { + public ContainerData readContainer(long containerID, + Pipeline pipeline) throws IOException { XceiverClientSpi client = null; try { - client = xceiverClientManager.acquireClient(pipeline); + client = xceiverClientManager.acquireClient(pipeline, containerID); String traceID = UUID.randomUUID().toString(); ReadContainerResponseProto response = - ContainerProtocolCalls.readContainer(client, - pipeline.getContainerName(), traceID); + ContainerProtocolCalls.readContainer(client, containerID, traceID); if (LOG.isDebugEnabled()) { LOG.debug("Read container {}, leader: {}, machines: {} ", - pipeline.getContainerName(), + containerID, pipeline.getLeader(), pipeline.getMachines()); } @@ -329,7 +331,7 @@ public class ContainerOperationClient implements ScmClient { * @throws IOException */ @Override - public Pipeline getContainer(String containerId) throws + public ContainerInfo getContainer(long containerId) throws IOException { return storageContainerLocationClient.getContainer(containerId); } @@ -341,7 +343,8 @@ public class ContainerOperationClient implements ScmClient { * @throws IOException */ @Override - public void closeContainer(Pipeline pipeline) throws IOException { + public void closeContainer(long containerId, Pipeline pipeline) + throws IOException { XceiverClientSpi client = null; try { LOG.debug("Close container {}", pipeline); @@ -364,18 +367,16 @@ public class ContainerOperationClient implements ScmClient { For now, take the #2 way. */ // Actually close the container on Datanode - client = xceiverClientManager.acquireClient(pipeline); + client = xceiverClientManager.acquireClient(pipeline, containerId); String traceID = UUID.randomUUID().toString(); - String containerId = pipeline.getContainerName(); - storageContainerLocationClient.notifyObjectStageChange( ObjectStageChangeRequestProto.Type.container, containerId, ObjectStageChangeRequestProto.Op.close, ObjectStageChangeRequestProto.Stage.begin); - ContainerProtocolCalls.closeContainer(client, traceID); + ContainerProtocolCalls.closeContainer(client, containerId, traceID); // Notify SCM to close the container storageContainerLocationClient.notifyObjectStageChange( ObjectStageChangeRequestProto.Type.container, @@ -391,13 +392,13 @@ public class ContainerOperationClient implements ScmClient { /** * Get the the current usage information. - * @param pipeline - Pipeline + * @param containerID - ID of the container. * @return the size of the given container. * @throws IOException */ @Override - public long getContainerSize(Pipeline pipeline) throws IOException { - // TODO : Pipeline can be null, handle it correctly. + public long getContainerSize(long containerID) throws IOException { + // TODO : Fix this, it currently returns the capacity but not the current usage. long size = getContainerSizeB(); if (size == -1) { throw new IOException("Container size unknown!"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java index 9b8eaa9..c4c3362 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.protocol.proto.ContainerProtos .ReadChunkResponseProto; +import org.apache.hadoop.hdds.client.BlockID; import java.io.EOFException; import java.io.IOException; @@ -45,7 +46,7 @@ public class ChunkInputStream extends InputStream implements Seekable { private static final int EOF = -1; - private final String key; + private final BlockID blockID; private final String traceID; private XceiverClientManager xceiverClientManager; private XceiverClientSpi xceiverClient; @@ -58,15 +59,15 @@ public class ChunkInputStream extends InputStream implements Seekable { /** * Creates a new ChunkInputStream. * - * @param key chunk key + * @param blockID block ID of the chunk * @param xceiverClientManager client manager that controls client * @param xceiverClient client to perform container calls * @param chunks list of chunks to read * @param traceID container protocol call traceID */ - public ChunkInputStream(String key, XceiverClientManager xceiverClientManager, + public ChunkInputStream(BlockID blockID, XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, List<ChunkInfo> chunks, String traceID) { - this.key = key; + this.blockID = blockID; this.traceID = traceID; this.xceiverClientManager = xceiverClientManager; this.xceiverClient = xceiverClient; @@ -196,7 +197,7 @@ public class ChunkInputStream extends InputStream implements Seekable { final ReadChunkResponseProto readChunkResponse; try { readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient, - chunks.get(chunkIndex), key, traceID); + chunks.get(chunkIndex), blockID, traceID); } catch (IOException e) { throw new IOException("Unexpected OzoneException: " + e.toString(), e); } @@ -211,7 +212,7 @@ public class ChunkInputStream extends InputStream implements Seekable { || pos >= chunkOffset[chunks.size() - 1] + chunks.get(chunks.size() - 1) .getLen()) { throw new EOFException( - "EOF encountered pos: " + pos + " container key: " + key); + "EOF encountered pos: " + pos + " container key: " + blockID.getLocalID()); } if (chunkIndex == -1) { chunkIndex = Arrays.binarySearch(chunkOffset, pos); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java index b65df9f..325f110 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.KeyData; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.KeyValue; +import org.apache.hadoop.hdds.client.BlockID; import java.io.IOException; import java.io.OutputStream; @@ -53,7 +54,7 @@ import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls */ public class ChunkOutputStream extends OutputStream { - private final String containerKey; + private final BlockID blockID; private final String key; private final String traceID; private final KeyData.Builder containerKeyData; @@ -67,25 +68,24 @@ public class ChunkOutputStream extends OutputStream { /** * Creates a new ChunkOutputStream. * - * @param containerKey container key + * @param blockID block ID * @param key chunk key * @param xceiverClientManager client manager that controls client * @param xceiverClient client to perform container calls * @param traceID container protocol call args * @param chunkSize chunk size */ - public ChunkOutputStream(String containerKey, String key, - XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, - String traceID, int chunkSize) { - this.containerKey = containerKey; + public ChunkOutputStream(BlockID blockID, String key, + XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, + String traceID, int chunkSize) { + this.blockID = blockID; this.key = key; this.traceID = traceID; this.chunkSize = chunkSize; KeyValue keyValue = KeyValue.newBuilder() .setKey("TYPE").setValue("KEY").build(); this.containerKeyData = KeyData.newBuilder() - .setContainerName(xceiverClient.getPipeline().getContainerName()) - .setName(containerKey) + .setBlockID(blockID.getProtobuf()) .addMetadata(keyValue); this.xceiverClientManager = xceiverClientManager; this.xceiverClient = xceiverClient; @@ -217,7 +217,7 @@ public class ChunkOutputStream extends OutputStream { .setLen(data.size()) .build(); try { - writeChunk(xceiverClient, chunk, key, data, traceID); + writeChunk(xceiverClient, chunk, blockID, data, traceID); } catch (IOException e) { throw new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java new file mode 100644 index 0000000..7236af7 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.client; + +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; + +/** + * BlockID of ozone (containerID + localID) + */ +public class BlockID { + private long containerID; + private long localID; + + public BlockID(long containerID, long localID) { + this.containerID = containerID; + this.localID = localID; + } + + public long getContainerID() { + return containerID; + } + + public long getLocalID() { + return localID; + } + + @Override + public String toString() { + return new ToStringBuilder(this). + append("containerID", containerID). + append("localID", localID). + toString(); + } + + public HddsProtos.BlockID getProtobuf() { + return HddsProtos.BlockID.newBuilder(). + setContainerID(containerID).setLocalID(localID).build(); + } + + public static BlockID getFromProtobuf(HddsProtos.BlockID blockID) { + return new BlockID(blockID.getContainerID(), + blockID.getLocalID()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java index 0d4a299..dcf9fed 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java @@ -41,78 +41,76 @@ import java.util.List; public interface ScmClient { /** * Creates a Container on SCM and returns the pipeline. - * @param containerId - String container ID - * @return Pipeline + * @return ContainerInfo * @throws IOException */ - Pipeline createContainer(String containerId, String owner) throws IOException; + ContainerInfo createContainer(String owner) throws IOException; /** * Gets a container by Name -- Throws if the container does not exist. - * @param containerId - String Container ID + * @param containerId - Container ID * @return Pipeline * @throws IOException */ - Pipeline getContainer(String containerId) throws IOException; + ContainerInfo getContainer(long containerId) throws IOException; /** - * Close a container by name. + * Close a container. * - * @param pipeline the container to be closed. + * @param containerId - ID of the container. + * @param pipeline - Pipeline where the container is located. * @throws IOException */ - void closeContainer(Pipeline pipeline) throws IOException; + void closeContainer(long containerId, Pipeline pipeline) throws IOException; /** * Deletes an existing container. + * @param containerId - ID of the container. * @param pipeline - Pipeline that represents the container. * @param force - true to forcibly delete the container. * @throws IOException */ - void deleteContainer(Pipeline pipeline, boolean force) throws IOException; + void deleteContainer(long containerId, Pipeline pipeline, boolean force) throws IOException; /** * Lists a range of containers and get their info. * - * @param startName start name, if null, start searching at the head. - * @param prefixName prefix name, if null, then filter is disabled. - * @param count count, if count < 0, the max size is unlimited.( - * Usually the count will be replace with a very big - * value instead of being unlimited in case the db is very big) + * @param startContainerID start containerID. + * @param count count must be > 0. * * @return a list of pipeline. * @throws IOException */ - List<ContainerInfo> listContainer(String startName, String prefixName, + List<ContainerInfo> listContainer(long startContainerID, int count) throws IOException; /** * Read meta data from an existing container. - * @param pipeline - Pipeline that represents the container. + * @param containerID - ID of the container. + * @param pipeline - Pipeline where the container is located. * @return ContainerInfo * @throws IOException */ - ContainerData readContainer(Pipeline pipeline) throws IOException; - + ContainerData readContainer(long containerID, Pipeline pipeline) + throws IOException; /** * Gets the container size -- Computed by SCM from Container Reports. - * @param pipeline - Pipeline + * @param containerID - ID of the container. * @return number of bytes used by this container. * @throws IOException */ - long getContainerSize(Pipeline pipeline) throws IOException; + long getContainerSize(long containerID) throws IOException; /** * Creates a Container on SCM and returns the pipeline. * @param type - Replication Type. * @param replicationFactor - Replication Factor - * @param containerId - Container ID - * @return Pipeline + * @return ContainerInfo * @throws IOException - in case of error. */ - Pipeline createContainer(HddsProtos.ReplicationType type, - HddsProtos.ReplicationFactor replicationFactor, String containerId, + ContainerInfo createContainer(HddsProtos.ReplicationType type, + HddsProtos.ReplicationFactor replicationFactor, String owner) throws IOException; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java index d253b15..9b89469 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java @@ -18,13 +18,15 @@ package org.apache.hadoop.hdds.scm.container.common.helpers; +import org.apache.hadoop.hdds.client.BlockID; + /** * Allocated block wraps the result returned from SCM#allocateBlock which * contains a Pipeline and the key. */ public final class AllocatedBlock { private Pipeline pipeline; - private String key; + private BlockID blockID; // Indicates whether the client should create container before writing block. private boolean shouldCreateContainer; @@ -33,7 +35,7 @@ public final class AllocatedBlock { */ public static class Builder { private Pipeline pipeline; - private String key; + private BlockID blockID; private boolean shouldCreateContainer; public Builder setPipeline(Pipeline p) { @@ -41,8 +43,8 @@ public final class AllocatedBlock { return this; } - public Builder setKey(String k) { - this.key = k; + public Builder setBlockID(BlockID blockID) { + this.blockID = blockID; return this; } @@ -52,14 +54,14 @@ public final class AllocatedBlock { } public AllocatedBlock build() { - return new AllocatedBlock(pipeline, key, shouldCreateContainer); + return new AllocatedBlock(pipeline, blockID, shouldCreateContainer); } } - private AllocatedBlock(Pipeline pipeline, String key, + private AllocatedBlock(Pipeline pipeline, BlockID blockID, boolean shouldCreateContainer) { this.pipeline = pipeline; - this.key = key; + this.blockID = blockID; this.shouldCreateContainer = shouldCreateContainer; } @@ -67,8 +69,8 @@ public final class AllocatedBlock { return pipeline; } - public String getKey() { - return key; + public BlockID getBlockID() { + return blockID; } public boolean getCreateContainer() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java index 823a7fb..0bd4c26 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java @@ -43,11 +43,9 @@ public class ContainerInfo // The wall-clock ms since the epoch at which the current state enters. private long stateEnterTime; private String owner; - private String containerName; private long containerID; ContainerInfo( long containerID, - final String containerName, HddsProtos.LifeCycleState state, Pipeline pipeline, long allocatedBytes, @@ -56,7 +54,6 @@ public class ContainerInfo long stateEnterTime, String owner) { this.containerID = containerID; - this.containerName = containerName; this.pipeline = pipeline; this.allocatedBytes = allocatedBytes; this.usedBytes = usedBytes; @@ -82,7 +79,6 @@ public class ContainerInfo builder.setState(info.getState()); builder.setStateEnterTime(info.getStateEnterTime()); builder.setOwner(info.getOwner()); - builder.setContainerName(info.getContainerName()); builder.setContainerID(info.getContainerID()); return builder.build(); } @@ -91,10 +87,6 @@ public class ContainerInfo return containerID; } - public String getContainerName() { - return containerName; - } - public HddsProtos.LifeCycleState getState() { return state; } @@ -170,7 +162,6 @@ public class ContainerInfo if (getOwner() != null) { builder.setOwner(getOwner()); } - builder.setContainerName(getContainerName()); return builder.build(); } @@ -189,7 +180,6 @@ public class ContainerInfo + ", pipeline=" + pipeline + ", stateEnterTime=" + stateEnterTime + ", owner=" + owner - + ", containerName='" + containerName + '}'; } @@ -206,7 +196,7 @@ public class ContainerInfo ContainerInfo that = (ContainerInfo) o; return new EqualsBuilder() - .append(pipeline.getContainerName(), that.pipeline.getContainerName()) + .append(getContainerID(), that.getContainerID()) // TODO : Fix this later. If we add these factors some tests fail. // So Commenting this to continue and will enforce this with @@ -221,7 +211,7 @@ public class ContainerInfo @Override public int hashCode() { return new HashCodeBuilder(11, 811) - .append(pipeline.getContainerName()) + .append(getContainerID()) .append(pipeline.getFactor()) .append(pipeline.getType()) .append(owner) @@ -275,7 +265,6 @@ public class ContainerInfo private long keys; private long stateEnterTime; private String owner; - private String containerName; private long containerID; public Builder setContainerID(long id) { @@ -319,14 +308,9 @@ public class ContainerInfo return this; } - public Builder setContainerName(String container) { - this.containerName = container; - return this; - } - public ContainerInfo build() { return new - ContainerInfo(containerID, containerName, state, pipeline, + ContainerInfo(containerID, state, pipeline, allocated, used, keys, stateEnterTime, owner); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/DeleteBlockResult.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/DeleteBlockResult.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/DeleteBlockResult.java index fd97eae..5f5aace 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/DeleteBlockResult.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/DeleteBlockResult.java @@ -17,6 +17,8 @@ package org.apache.hadoop.hdds.scm.container.common.helpers; +import org.apache.hadoop.hdds.client.BlockID; + import static org.apache.hadoop.hdds.protocol.proto .ScmBlockLocationProtocolProtos.DeleteScmBlockResult; @@ -24,21 +26,21 @@ import static org.apache.hadoop.hdds.protocol.proto * Class wraps storage container manager block deletion results. */ public class DeleteBlockResult { - private String key; + private BlockID blockID; private DeleteScmBlockResult.Result result; - public DeleteBlockResult(final String key, + public DeleteBlockResult(final BlockID blockID, final DeleteScmBlockResult.Result result) { - this.key = key; + this.blockID = blockID; this.result = result; } /** - * Get key deleted. - * @return key name. + * Get block id deleted. + * @return block id. */ - public String getKey() { - return key; + public BlockID getBlockID() { + return blockID; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java index 32d0a2d..8740838 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java @@ -57,7 +57,6 @@ public class Pipeline { WRITER = mapper.writer(filters); } - private String containerName; private PipelineChannel pipelineChannel; /** * Allows you to maintain private data on pipelines. This is not serialized @@ -68,11 +67,9 @@ public class Pipeline { /** * Constructs a new pipeline data structure. * - * @param containerName - Container * @param pipelineChannel - transport information for this container */ - public Pipeline(String containerName, PipelineChannel pipelineChannel) { - this.containerName = containerName; + public Pipeline(PipelineChannel pipelineChannel) { this.pipelineChannel = pipelineChannel; data = null; } @@ -87,7 +84,7 @@ public class Pipeline { Preconditions.checkNotNull(pipeline); PipelineChannel pipelineChannel = PipelineChannel.getFromProtoBuf(pipeline.getPipelineChannel()); - return new Pipeline(pipeline.getContainerName(), pipelineChannel); + return new Pipeline(pipelineChannel); } public HddsProtos.ReplicationFactor getFactor() { @@ -146,21 +143,11 @@ public class Pipeline { public HddsProtos.Pipeline getProtobufMessage() { HddsProtos.Pipeline.Builder builder = HddsProtos.Pipeline.newBuilder(); - builder.setContainerName(this.containerName); builder.setPipelineChannel(this.pipelineChannel.getProtobufMessage()); return builder.build(); } /** - * Returns containerName if available. - * - * @return String. - */ - public String getContainerName() { - return containerName; - } - - /** * Returns private data that is set on this pipeline. * * @return blob, the user can interpret it any way they like. @@ -223,7 +210,6 @@ public class Pipeline { pipelineChannel.getDatanodes().keySet().stream() .forEach(id -> b. append(id.endsWith(pipelineChannel.getLeaderID()) ? "*" + id : id)); - b.append("] container:").append(containerName); b.append(" name:").append(getPipelineName()); if (getType() != null) { b.append(" type:").append(getType().toString()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineChannel.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineChannel.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineChannel.java index ebd52e9..655751d 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineChannel.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineChannel.java @@ -40,6 +40,8 @@ public class PipelineChannel { private ReplicationType type; private ReplicationFactor factor; private String name; + // TODO: change to long based id + //private long id; public PipelineChannel(String leaderID, LifeCycleState lifeCycleState, ReplicationType replicationType, ReplicationFactor replicationFactor, http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java index f100fc7..c8d4a80 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java @@ -26,7 +26,6 @@ import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; import java.io.IOException; import java.util.List; -import java.util.Set; /** * ScmBlockLocationProtocol is used by an HDFS node to find the set of nodes @@ -35,17 +34,6 @@ import java.util.Set; public interface ScmBlockLocationProtocol { /** - * Find the set of nodes to read/write a block, as - * identified by the block key. This method supports batch lookup by - * passing multiple keys. - * - * @param keys batch of block keys to find - * @return allocated blocks for each block key - * @throws IOException if there is any failure - */ - Set<AllocatedBlock> getBlockLocations(Set<String> keys) throws IOException; - - /** * Asks SCM where a block should be allocated. SCM responds with the * set of datanodes that should be used creating this block. * @param size - size of the block. http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java index a60fbb2..e8d85e0 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java @@ -38,19 +38,20 @@ public interface StorageContainerLocationProtocol { * set of datanodes that should be used creating this container. * */ - Pipeline allocateContainer(HddsProtos.ReplicationType replicationType, - HddsProtos.ReplicationFactor factor, String containerName, String owner) + ContainerInfo allocateContainer(HddsProtos.ReplicationType replicationType, + HddsProtos.ReplicationFactor factor, String owner) throws IOException; /** * Ask SCM the location of the container. SCM responds with a group of * nodes where this container and its replicas are located. * - * @param containerName - Name of the container. - * @return Pipeline - the pipeline where container locates. + * @param containerID - ID of the container. + * @return ContainerInfo - the container info such as where the pipeline + * is located. * @throws IOException */ - Pipeline getContainer(String containerName) throws IOException; + ContainerInfo getContainer(long containerID) throws IOException; /** * Ask SCM a list of containers with a range of container names @@ -59,8 +60,7 @@ public interface StorageContainerLocationProtocol { * use prefix name to filter the result. the max size of the * searching range cannot exceed the value of count. * - * @param startName start name, if null, start searching at the head. - * @param prefixName prefix name, if null, then filter is disabled. + * @param startContainerID start container ID. * @param count count, if count < 0, the max size is unlimited.( * Usually the count will be replace with a very big * value instead of being unlimited in case the db is very big) @@ -68,18 +68,18 @@ public interface StorageContainerLocationProtocol { * @return a list of container. * @throws IOException */ - List<ContainerInfo> listContainer(String startName, String prefixName, - int count) throws IOException; + List<ContainerInfo> listContainer(long startContainerID, int count) + throws IOException; /** * Deletes a container in SCM. * - * @param containerName + * @param containerID * @throws IOException * if failed to delete the container mapping from db store * or container doesn't exist. */ - void deleteContainer(String containerName) throws IOException; + void deleteContainer(long containerID) throws IOException; /** * Queries a list of Node Statuses. @@ -94,12 +94,12 @@ public interface StorageContainerLocationProtocol { * or containers on datanodes. * Container will be in Operational state after that. * @param type object type - * @param name object name + * @param id object id * @param op operation type (e.g., create, close, delete) * @param stage creation stage */ void notifyObjectStageChange( - ObjectStageChangeRequestProto.Type type, String name, + ObjectStageChangeRequestProto.Type type, long id, ObjectStageChangeRequestProto.Op op, ObjectStageChangeRequestProto.Stage stage) throws IOException; http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java index 0012f3e..aed0fb7 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java @@ -17,10 +17,10 @@ package org.apache.hadoop.hdds.scm.protocolPB; import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; @@ -35,13 +35,7 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos .DeleteScmKeyBlocksResponseProto; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos - .GetScmBlockLocationsRequestProto; -import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos - .GetScmBlockLocationsResponseProto; -import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos .KeyBlocks; -import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos - .ScmLocatedBlockProto; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.RPC; @@ -52,7 +46,6 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Set; import java.util.stream.Collectors; /** @@ -82,41 +75,6 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB } /** - * Find the set of nodes to read/write a block, as - * identified by the block key. This method supports batch lookup by - * passing multiple keys. - * - * @param keys batch of block keys to find - * @return allocated blocks for each block key - * @throws IOException if there is any failure - */ - @Override - public Set<AllocatedBlock> getBlockLocations(Set<String> keys) - throws IOException { - GetScmBlockLocationsRequestProto.Builder req = - GetScmBlockLocationsRequestProto.newBuilder(); - for (String key : keys) { - req.addKeys(key); - } - final GetScmBlockLocationsResponseProto resp; - try { - resp = rpcProxy.getScmBlockLocations(NULL_RPC_CONTROLLER, - req.build()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - Set<AllocatedBlock> locatedBlocks = - Sets.newLinkedHashSetWithExpectedSize(resp.getLocatedBlocksCount()); - for (ScmLocatedBlockProto locatedBlock : resp.getLocatedBlocksList()) { - locatedBlocks.add(new AllocatedBlock.Builder() - .setKey(locatedBlock.getKey()) - .setPipeline(Pipeline.getFromProtoBuf(locatedBlock.getPipeline())) - .build()); - } - return locatedBlocks; - } - - /** * Asks SCM where a block should be allocated. SCM responds with the * set of datanodes that should be used creating this block. * @param size - size of the block. @@ -144,7 +102,7 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB response.getErrorMessage() : "Allocate block failed."); } AllocatedBlock.Builder builder = new AllocatedBlock.Builder() - .setKey(response.getKey()) + .setBlockID(BlockID.getFromProtobuf(response.getBlockID())) .setPipeline(Pipeline.getFromProtoBuf(response.getPipeline())) .setShouldCreateContainer(response.getCreateContainer()); return builder.build(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index 3638f63..bba4e17 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -17,7 +17,6 @@ package org.apache.hadoop.hdds.scm.protocolPB; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.classification.InterfaceAudience; @@ -92,20 +91,14 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB * supports replication factor of either 1 or 3. * @param type - Replication Type * @param factor - Replication Count - * @param containerName - Name * @return * @throws IOException */ @Override - public Pipeline allocateContainer(HddsProtos.ReplicationType type, - HddsProtos.ReplicationFactor factor, String - containerName, String owner) throws IOException { + public ContainerInfo allocateContainer(HddsProtos.ReplicationType type, + HddsProtos.ReplicationFactor factor, String owner) throws IOException { - Preconditions.checkNotNull(containerName, "Container Name cannot be Null"); - Preconditions.checkState(!containerName.isEmpty(), "Container name cannot" + - " be empty"); ContainerRequestProto request = ContainerRequestProto.newBuilder() - .setContainerName(containerName) .setReplicationFactor(factor) .setReplicationType(type) .setOwner(owner) @@ -121,22 +114,20 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB throw new IOException(response.hasErrorMessage() ? response.getErrorMessage() : "Allocate container failed."); } - return Pipeline.getFromProtoBuf(response.getPipeline()); + return ContainerInfo.fromProtobuf(response.getContainerInfo()); } - public Pipeline getContainer(String containerName) throws IOException { - Preconditions.checkNotNull(containerName, - "Container Name cannot be Null"); - Preconditions.checkState(!containerName.isEmpty(), - "Container name cannot be empty"); + public ContainerInfo getContainer(long containerID) throws IOException { + Preconditions.checkState(containerID >= 0, + "Container ID cannot be negative"); GetContainerRequestProto request = GetContainerRequestProto .newBuilder() - .setContainerName(containerName) + .setContainerID(containerID) .build(); try { GetContainerResponseProto response = rpcProxy.getContainer(NULL_RPC_CONTROLLER, request); - return Pipeline.getFromProtoBuf(response.getPipeline()); + return ContainerInfo.fromProtobuf(response.getContainerInfo()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } @@ -146,16 +137,15 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB * {@inheritDoc} */ @Override - public List<ContainerInfo> listContainer(String startName, String prefixName, - int count) throws IOException { + public List<ContainerInfo> listContainer(long startContainerID, int count) + throws IOException { + Preconditions.checkState(startContainerID >= 0, + "Container ID cannot be negative."); + Preconditions.checkState(count > 0, + "Container count must be greater than 0."); SCMListContainerRequestProto.Builder builder = SCMListContainerRequestProto .newBuilder(); - if (prefixName != null) { - builder.setPrefixName(prefixName); - } - if (startName != null) { - builder.setStartName(startName); - } + builder.setStartContainerID(startContainerID); builder.setCount(count); SCMListContainerRequestProto request = builder.build(); @@ -177,17 +167,17 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB * Ask SCM to delete a container by name. SCM will remove * the container mapping in its database. * - * @param containerName + * @param containerID * @throws IOException */ @Override - public void deleteContainer(String containerName) + public void deleteContainer(long containerID) throws IOException { - Preconditions.checkState(!Strings.isNullOrEmpty(containerName), - "Container name cannot be null or empty"); + Preconditions.checkState(containerID >= 0, + "Container ID cannot be negative"); SCMDeleteContainerRequestProto request = SCMDeleteContainerRequestProto .newBuilder() - .setContainerName(containerName) + .setContainerID(containerID) .build(); try { rpcProxy.deleteContainer(NULL_RPC_CONTROLLER, request); @@ -226,21 +216,21 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB /** * Notify from client that creates object on datanodes. * @param type object type - * @param name object name + * @param id object id * @param op operation type (e.g., create, close, delete) * @param stage object creation stage : begin/complete */ @Override public void notifyObjectStageChange( - ObjectStageChangeRequestProto.Type type, String name, + ObjectStageChangeRequestProto.Type type, long id, ObjectStageChangeRequestProto.Op op, ObjectStageChangeRequestProto.Stage stage) throws IOException { - Preconditions.checkState(!Strings.isNullOrEmpty(name), - "Object name cannot be null or empty"); + Preconditions.checkState(id >= 0, + "Object id cannot be negative."); ObjectStageChangeRequestProto request = ObjectStageChangeRequestProto.newBuilder() .setType(type) - .setName(name) + .setId(id) .setOp(op) .setStage(stage) .build(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 1559816..970e932 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Type; import org.apache.hadoop.hdds.protocol.proto.ContainerProtos .WriteChunkRequestProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.KeyValue; +import org.apache.hadoop.hdds.client.BlockID; import java.io.IOException; @@ -79,7 +80,6 @@ public final class ContainerProtocolCalls { KeyData containerKeyData, String traceID) throws IOException { GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto .newBuilder() - .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) .setKeyData(containerKeyData); String id = xceiverClient.getPipeline().getLeader().getUuidString(); ContainerCommandRequestProto request = ContainerCommandRequestProto @@ -106,7 +106,6 @@ public final class ContainerProtocolCalls { KeyData containerKeyData, String traceID) throws IOException { PutKeyRequestProto.Builder createKeyRequest = PutKeyRequestProto .newBuilder() - .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) .setKeyData(containerKeyData); String id = xceiverClient.getPipeline().getLeader().getUuidString(); ContainerCommandRequestProto request = ContainerCommandRequestProto @@ -125,18 +124,16 @@ public final class ContainerProtocolCalls { * * @param xceiverClient client to perform call * @param chunk information about chunk to read - * @param key the key name + * @param blockID ID of the block * @param traceID container protocol call args * @return container protocol read chunk response * @throws IOException if there is an I/O error while performing the call */ public static ReadChunkResponseProto readChunk(XceiverClientSpi xceiverClient, - ChunkInfo chunk, String key, String traceID) - throws IOException { + ChunkInfo chunk, BlockID blockID, String traceID) throws IOException { ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto .newBuilder() - .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) - .setKeyName(key) + .setBlockID(blockID.getProtobuf()) .setChunkData(chunk); String id = xceiverClient.getPipeline().getLeader().getUuidString(); ContainerCommandRequestProto request = ContainerCommandRequestProto @@ -156,18 +153,17 @@ public final class ContainerProtocolCalls { * * @param xceiverClient client to perform call * @param chunk information about chunk to write - * @param key the key name + * @param blockID ID of the block * @param data the data of the chunk to write * @param traceID container protocol call args * @throws IOException if there is an I/O error while performing the call */ public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk, - String key, ByteString data, String traceID) + BlockID blockID, ByteString data, String traceID) throws IOException { WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto .newBuilder() - .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) - .setKeyName(key) + .setBlockID(blockID.getProtobuf()) .setChunkData(chunk) .setData(data); String id = xceiverClient.getPipeline().getLeader().getUuidString(); @@ -189,30 +185,29 @@ public final class ContainerProtocolCalls { * than 1 MB. * * @param client - client that communicates with the container. - * @param containerName - Name of the container - * @param key - Name of the Key + * @param blockID - ID of the block * @param data - Data to be written into the container. * @param traceID - Trace ID for logging purpose. * @throws IOException */ public static void writeSmallFile(XceiverClientSpi client, - String containerName, String key, byte[] data, String traceID) + BlockID blockID, byte[] data, String traceID) throws IOException { KeyData containerKeyData = - KeyData.newBuilder().setContainerName(containerName).setName(key) + KeyData.newBuilder().setBlockID(blockID.getProtobuf()) .build(); PutKeyRequestProto.Builder createKeyRequest = PutKeyRequestProto.newBuilder() - .setPipeline(client.getPipeline().getProtobufMessage()) .setKeyData(containerKeyData); KeyValue keyValue = KeyValue.newBuilder().setKey("OverWriteRequested").setValue("true") .build(); ChunkInfo chunk = - ChunkInfo.newBuilder().setChunkName(key + "_chunk").setOffset(0) - .setLen(data.length).addMetadata(keyValue).build(); + ChunkInfo.newBuilder().setChunkName(blockID.getLocalID() + + "_chunk").setOffset(0).setLen(data.length). + addMetadata(keyValue).build(); PutSmallFileRequestProto putSmallFileRequest = PutSmallFileRequestProto.newBuilder().setChunkInfo(chunk) @@ -234,17 +229,18 @@ public final class ContainerProtocolCalls { /** * createContainer call that creates a container on the datanode. * @param client - client + * @param containerID - ID of container * @param traceID - traceID * @throws IOException */ - public static void createContainer(XceiverClientSpi client, String traceID) - throws IOException { + public static void createContainer(XceiverClientSpi client, long containerID, + String traceID) throws IOException { ContainerProtos.CreateContainerRequestProto.Builder createRequest = ContainerProtos.CreateContainerRequestProto .newBuilder(); ContainerProtos.ContainerData.Builder containerData = ContainerProtos .ContainerData.newBuilder(); - containerData.setName(client.getPipeline().getContainerName()); + containerData.setContainerID(containerID); createRequest.setPipeline(client.getPipeline().getProtobufMessage()); createRequest.setContainerData(containerData.build()); @@ -268,12 +264,11 @@ public final class ContainerProtocolCalls { * @param traceID * @throws IOException */ - public static void deleteContainer(XceiverClientSpi client, + public static void deleteContainer(XceiverClientSpi client, long containerID, boolean force, String traceID) throws IOException { ContainerProtos.DeleteContainerRequestProto.Builder deleteRequest = ContainerProtos.DeleteContainerRequestProto.newBuilder(); - deleteRequest.setName(client.getPipeline().getContainerName()); - deleteRequest.setPipeline(client.getPipeline().getProtobufMessage()); + deleteRequest.setContainerID(containerID); deleteRequest.setForceDelete(force); String id = client.getPipeline().getLeader().getUuidString(); ContainerCommandRequestProto.Builder request = @@ -291,14 +286,15 @@ public final class ContainerProtocolCalls { * Close a container. * * @param client + * @param containerID * @param traceID * @throws IOException */ - public static void closeContainer(XceiverClientSpi client, String traceID) - throws IOException { + public static void closeContainer(XceiverClientSpi client, + long containerID, String traceID) throws IOException { ContainerProtos.CloseContainerRequestProto.Builder closeRequest = ContainerProtos.CloseContainerRequestProto.newBuilder(); - closeRequest.setPipeline(client.getPipeline().getProtobufMessage()); + closeRequest.setContainerID(containerID); String id = client.getPipeline().getLeader().getUuidString(); ContainerCommandRequestProto.Builder request = @@ -320,11 +316,11 @@ public final class ContainerProtocolCalls { * @throws IOException */ public static ReadContainerResponseProto readContainer( - XceiverClientSpi client, String containerName, + XceiverClientSpi client, long containerID, String traceID) throws IOException { ReadContainerRequestProto.Builder readRequest = ReadContainerRequestProto.newBuilder(); - readRequest.setName(containerName); + readRequest.setContainerID(containerID); readRequest.setPipeline(client.getPipeline().getProtobufMessage()); String id = client.getPipeline().getLeader().getUuidString(); ContainerCommandRequestProto.Builder request = @@ -340,25 +336,23 @@ public final class ContainerProtocolCalls { } /** - * Reads the data given the container name and key. + * Reads the data given the blockID * * @param client - * @param containerName - name of the container - * @param key - key + * @param blockID - ID of the block * @param traceID - trace ID * @return GetSmallFileResponseProto * @throws IOException */ public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client, - String containerName, String key, String traceID) throws IOException { + BlockID blockID, String traceID) throws IOException { KeyData containerKeyData = KeyData .newBuilder() - .setContainerName(containerName) - .setName(key).build(); + .setBlockID(blockID.getProtobuf()) + .build(); GetKeyRequestProto.Builder getKey = GetKeyRequestProto .newBuilder() - .setPipeline(client.getPipeline().getProtobufMessage()) .setKeyData(containerKeyData); ContainerProtos.GetSmallFileRequestProto getSmallFileRequest = GetSmallFileRequestProto http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java index 38ce6cc..7a5403f 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java @@ -17,9 +17,12 @@ package org.apache.hadoop.ozone.common; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos .KeyBlocks; +import java.util.ArrayList; import java.util.List; /** @@ -28,13 +31,13 @@ import java.util.List; public final class BlockGroup { private String groupID; - private List<String> blockIDs; - private BlockGroup(String groupID, List<String> blockIDs) { + private List<BlockID> blockIDs; + private BlockGroup(String groupID, List<BlockID> blockIDs) { this.groupID = groupID; this.blockIDs = blockIDs; } - public List<String> getBlockIDList() { + public List<BlockID> getBlockIDList() { return blockIDs; } @@ -43,8 +46,11 @@ public final class BlockGroup { } public KeyBlocks getProto() { - return KeyBlocks.newBuilder().setKey(groupID) - .addAllBlocks(blockIDs).build(); + KeyBlocks.Builder kbb = KeyBlocks.newBuilder(); + for (BlockID block : blockIDs) { + kbb.addBlocks(block.getProtobuf()); + } + return kbb.setKey(groupID).build(); } /** @@ -53,8 +59,12 @@ public final class BlockGroup { * @return a group of blocks. */ public static BlockGroup getFromProto(KeyBlocks proto) { + List<BlockID> blockIDs = new ArrayList<>(); + for (HddsProtos.BlockID block : proto.getBlocksList()) { + blockIDs.add(new BlockID(block.getContainerID(), block.getLocalID())); + } return BlockGroup.newBuilder().setKeyName(proto.getKey()) - .addAllBlockIDs(proto.getBlocksList()).build(); + .addAllBlockIDs(blockIDs).build(); } public static Builder newBuilder() { @@ -67,14 +77,14 @@ public final class BlockGroup { public static class Builder { private String groupID; - private List<String> blockIDs; + private List<BlockID> blockIDs; public Builder setKeyName(String blockGroupID) { this.groupID = blockGroupID; return this; } - public Builder addAllBlockIDs(List<String> keyBlocks) { + public Builder addAllBlockIDs(List<BlockID> keyBlocks) { this.blockIDs = keyBlocks; return this; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/DeleteBlockGroupResult.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/DeleteBlockGroupResult.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/DeleteBlockGroupResult.java index ec54ac5..892b695 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/DeleteBlockGroupResult.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/DeleteBlockGroupResult.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.ozone.common; +import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos .DeleteScmBlockResult; @@ -52,7 +53,7 @@ public class DeleteBlockGroupResult { new ArrayList<>(blockResultList.size()); for (DeleteBlockResult result : blockResultList) { DeleteScmBlockResult proto = DeleteScmBlockResult.newBuilder() - .setKey(result.getKey()) + .setBlockID(result.getBlockID().getProtobuf()) .setResult(result.getResult()).build(); resultProtoList.add(proto); } @@ -63,8 +64,8 @@ public class DeleteBlockGroupResult { List<DeleteScmBlockResult> results) { List<DeleteBlockResult> protoResults = new ArrayList<>(results.size()); for (DeleteScmBlockResult result : results) { - protoResults.add(new DeleteBlockResult(result.getKey(), - result.getResult())); + protoResults.add(new DeleteBlockResult(BlockID.getFromProtobuf( + result.getBlockID()), result.getResult())); } return protoResults; } @@ -87,10 +88,10 @@ public class DeleteBlockGroupResult { /** * @return A list of deletion failed block IDs. */ - public List<String> getFailedBlocks() { - List<String> failedBlocks = blockResultList.stream() + public List<BlockID> getFailedBlocks() { + List<BlockID> failedBlocks = blockResultList.stream() .filter(result -> result.getResult() != Result.success) - .map(DeleteBlockResult::getKey).collect(Collectors.toList()); + .map(DeleteBlockResult::getBlockID).collect(Collectors.toList()); return failedBlocks; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java index be546c7..c3de5ed 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.common.helpers; import org.apache.hadoop.hdds.protocol.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.client.BlockID; import java.io.IOException; import java.util.Collections; @@ -30,8 +31,7 @@ import java.util.TreeMap; * Helper class to convert Protobuf to Java classes. */ public class KeyData { - private final String containerName; - private final String keyName; + private final BlockID blockID; private final Map<String, String> metadata; /** @@ -44,12 +44,10 @@ public class KeyData { /** * Constructs a KeyData Object. * - * @param containerName - * @param keyName + * @param blockID */ - public KeyData(String containerName, String keyName) { - this.containerName = containerName; - this.keyName = keyName; + public KeyData(BlockID blockID) { + this.blockID = blockID; this.metadata = new TreeMap<>(); } @@ -62,7 +60,7 @@ public class KeyData { */ public static KeyData getFromProtoBuf(ContainerProtos.KeyData data) throws IOException { - KeyData keyData = new KeyData(data.getContainerName(), data.getName()); + KeyData keyData = new KeyData(BlockID.getFromProtobuf(data.getBlockID())); for (int x = 0; x < data.getMetadataCount(); x++) { keyData.addMetadata(data.getMetadata(x).getKey(), data.getMetadata(x).getValue()); @@ -78,8 +76,7 @@ public class KeyData { public ContainerProtos.KeyData getProtoBufMessage() { ContainerProtos.KeyData.Builder builder = ContainerProtos.KeyData.newBuilder(); - builder.setContainerName(this.containerName); - builder.setName(this.getKeyName()); + builder.setBlockID(this.blockID.getProtobuf()); builder.addAllChunks(this.chunks); for (Map.Entry<String, String> entry : metadata.entrySet()) { HddsProtos.KeyValue.Builder keyValBuilder = @@ -135,19 +132,27 @@ public class KeyData { } /** - * Returns container Name. - * @return String. + * Returns container ID. + * @return long. */ - public String getContainerName() { - return containerName; + public long getContainerID() { + return blockID.getContainerID(); } /** - * Returns KeyName. - * @return String. + * Returns LocalID. + * @return long. */ - public String getKeyName() { - return keyName; + public long getLocalID() { + return blockID.getLocalID(); + } + + /** + * Return Block ID. + * @return BlockID. + */ + public BlockID getBlockID() { + return blockID; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java index fa79341..37a1309 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.ozone.protocolPB; -import com.google.common.collect.Sets; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.classification.InterfaceAudience; @@ -38,18 +37,11 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos .DeleteScmKeyBlocksRequestProto; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos .DeleteScmKeyBlocksResponseProto; -import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos - .GetScmBlockLocationsRequestProto; -import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos - .GetScmBlockLocationsResponseProto; -import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos - .ScmLocatedBlockProto; import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; import java.io.IOException; import java.util.List; -import java.util.Set; import java.util.stream.Collectors; /** @@ -73,34 +65,6 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB this.impl = impl; } - - @Override - public GetScmBlockLocationsResponseProto getScmBlockLocations( - RpcController controller, GetScmBlockLocationsRequestProto req) - throws ServiceException { - Set<String> keys = Sets.newLinkedHashSetWithExpectedSize( - req.getKeysCount()); - for (String key : req.getKeysList()) { - keys.add(key); - } - final Set<AllocatedBlock> blocks; - try { - blocks = impl.getBlockLocations(keys); - } catch (IOException ex) { - throw new ServiceException(ex); - } - GetScmBlockLocationsResponseProto.Builder resp = - GetScmBlockLocationsResponseProto.newBuilder(); - for (AllocatedBlock block: blocks) { - ScmLocatedBlockProto.Builder locatedBlock = - ScmLocatedBlockProto.newBuilder() - .setKey(block.getKey()) - .setPipeline(block.getPipeline().getProtobufMessage()); - resp.addLocatedBlocks(locatedBlock.build()); - } - return resp.build(); - } - @Override public AllocateScmBlockResponseProto allocateScmBlock( RpcController controller, AllocateScmBlockRequestProto request) @@ -112,7 +76,7 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB if (allocatedBlock != null) { return AllocateScmBlockResponseProto.newBuilder() - .setKey(allocatedBlock.getKey()) + .setBlockID(allocatedBlock.getBlockID().getProtobuf()) .setPipeline(allocatedBlock.getPipeline().getProtobufMessage()) .setCreateContainer(allocatedBlock.getCreateContainer()) .setErrorCode(AllocateScmBlockResponseProto.Error.success) --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org