http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocol/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocol/package-info.java b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocol/package-info.java deleted file mode 100644 index 274f859..0000000 --- a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocol/package-info.java +++ /dev/null @@ -1,19 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.scm.protocol;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java deleted file mode 100644 index 0de759f..0000000 --- a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java +++ /dev/null @@ -1,207 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.scm.protocolPB; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.ipc.ProtobufHelper; -import org.apache.hadoop.ipc.ProtocolTranslator; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; -import org.apache.hadoop.ozone.common.BlockGroup; -import org.apache.hadoop.hdsl.protocol.proto.HdslProtos; -import org.apache.hadoop.hdsl.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto; -import org.apache.hadoop.hdsl.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockRequestProto; -import org.apache.hadoop.hdsl.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockResponseProto; -import org.apache.hadoop.hdsl.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksResponseProto; -import org.apache.hadoop.hdsl.protocol.proto.ScmBlockLocationProtocolProtos.GetScmBlockLocationsRequestProto; -import org.apache.hadoop.hdsl.protocol.proto.ScmBlockLocationProtocolProtos.GetScmBlockLocationsResponseProto; -import org.apache.hadoop.hdsl.protocol.proto.ScmBlockLocationProtocolProtos.ScmLocatedBlockProto; -import org.apache.hadoop.hdsl.protocol.proto.ScmBlockLocationProtocolProtos.KeyBlocks; -import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock; -import org.apache.hadoop.scm.ScmInfo; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - -/** - * This class is the client-side translator to translate the requests made on - * the {@link ScmBlockLocationProtocol} interface to the RPC server - * implementing {@link ScmBlockLocationProtocolPB}. - */ -@InterfaceAudience.Private -public final class ScmBlockLocationProtocolClientSideTranslatorPB - implements ScmBlockLocationProtocol, ProtocolTranslator, Closeable { - - /** - * RpcController is not used and hence is set to null. - */ - private static final RpcController NULL_RPC_CONTROLLER = null; - - private final ScmBlockLocationProtocolPB rpcProxy; - - /** - * Creates a new StorageContainerLocationProtocolClientSideTranslatorPB. - * - * @param rpcProxy {@link StorageContainerLocationProtocolPB} RPC proxy - */ - public ScmBlockLocationProtocolClientSideTranslatorPB( - ScmBlockLocationProtocolPB rpcProxy) { - this.rpcProxy = rpcProxy; - } - - /** - * Find the set of nodes to read/write a block, as - * identified by the block key. This method supports batch lookup by - * passing multiple keys. - * - * @param keys batch of block keys to find - * @return allocated blocks for each block key - * @throws IOException if there is any failure - */ - @Override - public Set<AllocatedBlock> getBlockLocations(Set<String> keys) - throws IOException { - GetScmBlockLocationsRequestProto.Builder req = - GetScmBlockLocationsRequestProto.newBuilder(); - for (String key : keys) { - req.addKeys(key); - } - final GetScmBlockLocationsResponseProto resp; - try { - resp = rpcProxy.getScmBlockLocations(NULL_RPC_CONTROLLER, - req.build()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - Set<AllocatedBlock> locatedBlocks = - Sets.newLinkedHashSetWithExpectedSize(resp.getLocatedBlocksCount()); - for (ScmLocatedBlockProto locatedBlock : resp.getLocatedBlocksList()) { - locatedBlocks.add(new AllocatedBlock.Builder() - .setKey(locatedBlock.getKey()) - .setPipeline(Pipeline.getFromProtoBuf(locatedBlock.getPipeline())) - .build()); - } - return locatedBlocks; - } - - /** - * Asks SCM where a block should be allocated. SCM responds with the - * set of datanodes that should be used creating this block. - * @param size - size of the block. - * @return allocated block accessing info (key, pipeline). - * @throws IOException - */ - @Override - public AllocatedBlock allocateBlock(long size, - HdslProtos.ReplicationType type, HdslProtos.ReplicationFactor factor, - String owner) throws IOException { - Preconditions.checkArgument(size > 0, "block size must be greater than 0"); - - AllocateScmBlockRequestProto request = - AllocateScmBlockRequestProto.newBuilder().setSize(size).setType(type) - .setFactor(factor).setOwner(owner).build(); - final AllocateScmBlockResponseProto response; - try { - response = rpcProxy.allocateScmBlock(NULL_RPC_CONTROLLER, request); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - if (response.getErrorCode() != - AllocateScmBlockResponseProto.Error.success) { - throw new IOException(response.hasErrorMessage() ? - response.getErrorMessage() : "Allocate block failed."); - } - AllocatedBlock.Builder builder = new AllocatedBlock.Builder() - .setKey(response.getKey()) - .setPipeline(Pipeline.getFromProtoBuf(response.getPipeline())) - .setShouldCreateContainer(response.getCreateContainer()); - return builder.build(); - } - - /** - * Delete the set of keys specified. - * - * @param keyBlocksInfoList batch of block keys to delete. - * @return list of block deletion results. - * @throws IOException if there is any failure. - * - */ - @Override - public List<DeleteBlockGroupResult> deleteKeyBlocks( - List<BlockGroup> keyBlocksInfoList) throws IOException { - List<KeyBlocks> keyBlocksProto = keyBlocksInfoList.stream() - .map(BlockGroup::getProto).collect(Collectors.toList()); - DeleteScmKeyBlocksRequestProto request = DeleteScmKeyBlocksRequestProto - .newBuilder().addAllKeyBlocks(keyBlocksProto).build(); - - final DeleteScmKeyBlocksResponseProto resp; - try { - resp = rpcProxy.deleteScmKeyBlocks(NULL_RPC_CONTROLLER, request); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - List<DeleteBlockGroupResult> results = - new ArrayList<>(resp.getResultsCount()); - results.addAll(resp.getResultsList().stream().map( - result -> new DeleteBlockGroupResult(result.getObjectKey(), - DeleteBlockGroupResult - .convertBlockResultProto(result.getBlockResultsList()))) - .collect(Collectors.toList())); - return results; - } - - /** - * Gets the cluster Id and Scm Id from SCM. - * @return ScmInfo - * @throws IOException - */ - @Override - public ScmInfo getScmInfo() throws IOException { - HdslProtos.GetScmInfoRequestProto request = - HdslProtos.GetScmInfoRequestProto.getDefaultInstance(); - HdslProtos.GetScmInfoRespsonseProto resp; - try { - resp = rpcProxy.getScmInfo(NULL_RPC_CONTROLLER, request); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - ScmInfo.Builder builder = new ScmInfo.Builder() - .setClusterId(resp.getClusterId()) - .setScmId(resp.getScmId()); - return builder.build(); - } - - @Override - public Object getUnderlyingProxyObject() { - return rpcProxy; - } - - @Override - public void close() { - RPC.stopProxy(rpcProxy); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolPB.java b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolPB.java deleted file mode 100644 index 019aeeb..0000000 --- a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolPB.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.scm.protocolPB; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.ipc.ProtocolInfo; -import org.apache.hadoop.hdsl.protocol.proto.ScmBlockLocationProtocolProtos - .ScmBlockLocationProtocolService; - -/** - * Protocol used from an HDFS node to StorageContainerManager. This extends the - * Protocol Buffers service interface to add Hadoop-specific annotations. - */ -@ProtocolInfo(protocolName = - "org.apache.hadoop.ozone.protocol.ScmBlockLocationProtocol", - protocolVersion = 1) -@InterfaceAudience.Private -public interface ScmBlockLocationProtocolPB - extends ScmBlockLocationProtocolService.BlockingInterface { -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java deleted file mode 100644 index 348b266..0000000 --- a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ /dev/null @@ -1,304 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.scm.protocolPB; - -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.ipc.ProtobufHelper; -import org.apache.hadoop.ipc.ProtocolTranslator; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.hdsl.protocol.proto.HdslProtos; -import org.apache.hadoop.scm.ScmInfo; -import org.apache.hadoop.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerResponseProto; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerResponseProto; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryRequestProto; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryResponseProto; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.List; - -/** - * This class is the client-side translator to translate the requests made on - * the {@link StorageContainerLocationProtocol} interface to the RPC server - * implementing {@link StorageContainerLocationProtocolPB}. - */ -@InterfaceAudience.Private -public final class StorageContainerLocationProtocolClientSideTranslatorPB - implements StorageContainerLocationProtocol, ProtocolTranslator, Closeable { - - /** - * RpcController is not used and hence is set to null. - */ - private static final RpcController NULL_RPC_CONTROLLER = null; - - private final StorageContainerLocationProtocolPB rpcProxy; - - /** - * Creates a new StorageContainerLocationProtocolClientSideTranslatorPB. - * - * @param rpcProxy {@link StorageContainerLocationProtocolPB} RPC proxy - */ - public StorageContainerLocationProtocolClientSideTranslatorPB( - StorageContainerLocationProtocolPB rpcProxy) { - this.rpcProxy = rpcProxy; - } - - /** - * Asks SCM where a container should be allocated. SCM responds with the set - * of datanodes that should be used creating this container. Ozone/SCM only - * supports replication factor of either 1 or 3. - * @param type - Replication Type - * @param factor - Replication Count - * @param containerName - Name - * @return - * @throws IOException - */ - @Override - public Pipeline allocateContainer(HdslProtos.ReplicationType type, - HdslProtos.ReplicationFactor factor, String - containerName, String owner) throws IOException { - - Preconditions.checkNotNull(containerName, "Container Name cannot be Null"); - Preconditions.checkState(!containerName.isEmpty(), "Container name cannot" + - " be empty"); - ContainerRequestProto request = ContainerRequestProto.newBuilder() - .setContainerName(containerName) - .setReplicationFactor(factor) - .setReplicationType(type) - .setOwner(owner) - .build(); - - final ContainerResponseProto response; - try { - response = rpcProxy.allocateContainer(NULL_RPC_CONTROLLER, request); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - if (response.getErrorCode() != ContainerResponseProto.Error.success) { - throw new IOException(response.hasErrorMessage() ? - response.getErrorMessage() : "Allocate container failed."); - } - return Pipeline.getFromProtoBuf(response.getPipeline()); - } - - public Pipeline getContainer(String containerName) throws IOException { - Preconditions.checkNotNull(containerName, - "Container Name cannot be Null"); - Preconditions.checkState(!containerName.isEmpty(), - "Container name cannot be empty"); - GetContainerRequestProto request = GetContainerRequestProto - .newBuilder() - .setContainerName(containerName) - .build(); - try { - GetContainerResponseProto response = - rpcProxy.getContainer(NULL_RPC_CONTROLLER, request); - return Pipeline.getFromProtoBuf(response.getPipeline()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - } - - /** - * {@inheritDoc} - */ - @Override - public List<ContainerInfo> listContainer(String startName, String prefixName, - int count) throws IOException { - SCMListContainerRequestProto.Builder builder = SCMListContainerRequestProto - .newBuilder(); - if (prefixName != null) { - builder.setPrefixName(prefixName); - } - if (startName != null) { - builder.setStartName(startName); - } - builder.setCount(count); - SCMListContainerRequestProto request = builder.build(); - - try { - SCMListContainerResponseProto response = - rpcProxy.listContainer(NULL_RPC_CONTROLLER, request); - List<ContainerInfo> containerList = new ArrayList<>(); - for (HdslProtos.SCMContainerInfo containerInfoProto : response - .getContainersList()) { - containerList.add(ContainerInfo.fromProtobuf(containerInfoProto)); - } - return containerList; - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - } - - /** - * Ask SCM to delete a container by name. SCM will remove - * the container mapping in its database. - * - * @param containerName - * @throws IOException - */ - @Override - public void deleteContainer(String containerName) - throws IOException { - Preconditions.checkState(!Strings.isNullOrEmpty(containerName), - "Container name cannot be null or empty"); - SCMDeleteContainerRequestProto request = SCMDeleteContainerRequestProto - .newBuilder() - .setContainerName(containerName) - .build(); - try { - rpcProxy.deleteContainer(NULL_RPC_CONTROLLER, request); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - } - - /** - * Queries a list of Node Statuses. - * - * @param nodeStatuses - * @return List of Datanodes. - */ - @Override - public HdslProtos.NodePool queryNode(EnumSet<HdslProtos.NodeState> - nodeStatuses, HdslProtos.QueryScope queryScope, String poolName) - throws IOException { - // TODO : We support only cluster wide query right now. So ignoring checking - // queryScope and poolName - Preconditions.checkNotNull(nodeStatuses); - Preconditions.checkState(nodeStatuses.size() > 0); - NodeQueryRequestProto request = NodeQueryRequestProto.newBuilder() - .addAllQuery(nodeStatuses) - .setScope(queryScope).setPoolName(poolName).build(); - try { - NodeQueryResponseProto response = - rpcProxy.queryNode(NULL_RPC_CONTROLLER, request); - return response.getDatanodes(); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - - } - - /** - * Notify from client that creates object on datanodes. - * @param type object type - * @param name object name - * @param op operation type (e.g., create, close, delete) - * @param stage object creation stage : begin/complete - */ - @Override - public void notifyObjectStageChange( - ObjectStageChangeRequestProto.Type type, String name, - ObjectStageChangeRequestProto.Op op, - ObjectStageChangeRequestProto.Stage stage) throws IOException { - Preconditions.checkState(!Strings.isNullOrEmpty(name), - "Object name cannot be null or empty"); - ObjectStageChangeRequestProto request = - ObjectStageChangeRequestProto.newBuilder() - .setType(type) - .setName(name) - .setOp(op) - .setStage(stage) - .build(); - try { - rpcProxy.notifyObjectStageChange(NULL_RPC_CONTROLLER, request); - } catch(ServiceException e){ - throw ProtobufHelper.getRemoteException(e); - } - } - - /** - * Creates a replication pipeline of a specified type. - * - * @param replicationType - replication type - * @param factor - factor 1 or 3 - * @param nodePool - optional machine list to build a pipeline. - * @throws IOException - */ - @Override - public Pipeline createReplicationPipeline(HdslProtos.ReplicationType - replicationType, HdslProtos.ReplicationFactor factor, HdslProtos - .NodePool nodePool) throws IOException { - PipelineRequestProto request = PipelineRequestProto.newBuilder() - .setNodePool(nodePool) - .setReplicationFactor(factor) - .setReplicationType(replicationType) - .build(); - try { - PipelineResponseProto response = - rpcProxy.allocatePipeline(NULL_RPC_CONTROLLER, request); - if (response.getErrorCode() == - PipelineResponseProto.Error.success) { - Preconditions.checkState(response.hasPipeline(), "With success, " + - "must come a pipeline"); - return Pipeline.getFromProtoBuf(response.getPipeline()); - } else { - String errorMessage = String.format("create replication pipeline " + - "failed. code : %s Message: %s", response.getErrorCode(), - response.hasErrorMessage() ? response.getErrorMessage() : ""); - throw new IOException(errorMessage); - } - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - } - - @Override - public ScmInfo getScmInfo() throws IOException { - HdslProtos.GetScmInfoRequestProto request = - HdslProtos.GetScmInfoRequestProto.getDefaultInstance(); - try { - HdslProtos.GetScmInfoRespsonseProto resp = rpcProxy.getScmInfo( - NULL_RPC_CONTROLLER, request); - ScmInfo.Builder builder = new ScmInfo.Builder() - .setClusterId(resp.getClusterId()) - .setScmId(resp.getScmId()); - return builder.build(); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - - } - - @Override - public Object getUnderlyingProxyObject() { - return rpcProxy; - } - - @Override - public void close() { - RPC.stopProxy(rpcProxy); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolPB.java b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolPB.java deleted file mode 100644 index b8c2958..0000000 --- a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolPB.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.scm.protocolPB; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.ipc.ProtocolInfo; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.StorageContainerLocationProtocolService; - -/** - * Protocol used from an HDFS node to StorageContainerManager. This extends the - * Protocol Buffers service interface to add Hadoop-specific annotations. - */ -@ProtocolInfo(protocolName = - "org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol", - protocolVersion = 1) -@InterfaceAudience.Private -public interface StorageContainerLocationProtocolPB - extends StorageContainerLocationProtocolService.BlockingInterface { -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/package-info.java b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/package-info.java deleted file mode 100644 index f9a2c09..0000000 --- a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/package-info.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.scm.protocolPB; - -/** - * This package contains classes for the client of the storage container - * protocol. - */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java deleted file mode 100644 index 174f1c1..0000000 --- a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java +++ /dev/null @@ -1,397 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.scm.storage; - -import com.google.protobuf.ByteString; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ChunkInfo; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .ContainerCommandRequestProto; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .ContainerCommandResponseProto; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .GetKeyRequestProto; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .GetKeyResponseProto; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .GetSmallFileRequestProto; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .GetSmallFileResponseProto; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.KeyData; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .PutKeyRequestProto; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .PutSmallFileRequestProto; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .ReadChunkRequestProto; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .ReadChunkResponseProto; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.Type; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .WriteChunkRequestProto; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .ReadContainerResponseProto; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .ReadContainerRequestProto; -import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.KeyValue; -import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; - -import java.io.IOException; -import org.apache.hadoop.scm.XceiverClientSpi; - -/** - * Implementation of all container protocol calls performed by Container - * clients. - */ -public final class ContainerProtocolCalls { - - /** - * There is no need to instantiate this class. - */ - private ContainerProtocolCalls() { - } - - /** - * Calls the container protocol to get a container key. - * - * @param xceiverClient client to perform call - * @param containerKeyData key data to identify container - * @param traceID container protocol call args - * @return container protocol get key response - * @throws IOException if there is an I/O error while performing the call - */ - public static GetKeyResponseProto getKey(XceiverClientSpi xceiverClient, - KeyData containerKeyData, String traceID) throws IOException { - GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto - .newBuilder() - .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) - .setKeyData(containerKeyData); - String id = xceiverClient.getPipeline().getLeader().getUuidString(); - ContainerCommandRequestProto request = ContainerCommandRequestProto - .newBuilder() - .setCmdType(Type.GetKey) - .setTraceID(traceID) - .setDatanodeUuid(id) - .setGetKey(readKeyRequest) - .build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response); - return response.getGetKey(); - } - - /** - * Calls the container protocol to put a container key. - * - * @param xceiverClient client to perform call - * @param containerKeyData key data to identify container - * @param traceID container protocol call args - * @throws IOException if there is an I/O error while performing the call - */ - public static void putKey(XceiverClientSpi xceiverClient, - KeyData containerKeyData, String traceID) throws IOException { - PutKeyRequestProto.Builder createKeyRequest = PutKeyRequestProto - .newBuilder() - .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) - .setKeyData(containerKeyData); - String id = xceiverClient.getPipeline().getLeader().getUuidString(); - ContainerCommandRequestProto request = ContainerCommandRequestProto - .newBuilder() - .setCmdType(Type.PutKey) - .setTraceID(traceID) - .setDatanodeUuid(id) - .setPutKey(createKeyRequest) - .build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response); - } - - /** - * Calls the container protocol to read a chunk. - * - * @param xceiverClient client to perform call - * @param chunk information about chunk to read - * @param key the key name - * @param traceID container protocol call args - * @return container protocol read chunk response - * @throws IOException if there is an I/O error while performing the call - */ - public static ReadChunkResponseProto readChunk(XceiverClientSpi xceiverClient, - ChunkInfo chunk, String key, String traceID) - throws IOException { - ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto - .newBuilder() - .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) - .setKeyName(key) - .setChunkData(chunk); - String id = xceiverClient.getPipeline().getLeader().getUuidString(); - ContainerCommandRequestProto request = ContainerCommandRequestProto - .newBuilder() - .setCmdType(Type.ReadChunk) - .setTraceID(traceID) - .setDatanodeUuid(id) - .setReadChunk(readChunkRequest) - .build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response); - return response.getReadChunk(); - } - - /** - * Calls the container protocol to write a chunk. - * - * @param xceiverClient client to perform call - * @param chunk information about chunk to write - * @param key the key name - * @param data the data of the chunk to write - * @param traceID container protocol call args - * @throws IOException if there is an I/O error while performing the call - */ - public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk, - String key, ByteString data, String traceID) - throws IOException { - WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto - .newBuilder() - .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) - .setKeyName(key) - .setChunkData(chunk) - .setData(data); - String id = xceiverClient.getPipeline().getLeader().getUuidString(); - ContainerCommandRequestProto request = ContainerCommandRequestProto - .newBuilder() - .setCmdType(Type.WriteChunk) - .setTraceID(traceID) - .setDatanodeUuid(id) - .setWriteChunk(writeChunkRequest) - .build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response); - } - - /** - * Allows writing a small file using single RPC. This takes the container - * name, key name and data to write sends all that data to the container using - * a single RPC. This API is designed to be used for files which are smaller - * than 1 MB. - * - * @param client - client that communicates with the container. - * @param containerName - Name of the container - * @param key - Name of the Key - * @param data - Data to be written into the container. - * @param traceID - Trace ID for logging purpose. - * @throws IOException - */ - public static void writeSmallFile(XceiverClientSpi client, - String containerName, String key, byte[] data, String traceID) - throws IOException { - - KeyData containerKeyData = - KeyData.newBuilder().setContainerName(containerName).setName(key) - .build(); - PutKeyRequestProto.Builder createKeyRequest = - PutKeyRequestProto.newBuilder() - .setPipeline(client.getPipeline().getProtobufMessage()) - .setKeyData(containerKeyData); - - KeyValue keyValue = - KeyValue.newBuilder().setKey("OverWriteRequested").setValue("true") - .build(); - ChunkInfo chunk = - ChunkInfo.newBuilder().setChunkName(key + "_chunk").setOffset(0) - .setLen(data.length).addMetadata(keyValue).build(); - - PutSmallFileRequestProto putSmallFileRequest = - PutSmallFileRequestProto.newBuilder().setChunkInfo(chunk) - .setKey(createKeyRequest).setData(ByteString.copyFrom(data)) - .build(); - - String id = client.getPipeline().getLeader().getUuidString(); - ContainerCommandRequestProto request = - ContainerCommandRequestProto.newBuilder() - .setCmdType(Type.PutSmallFile) - .setTraceID(traceID) - .setDatanodeUuid(id) - .setPutSmallFile(putSmallFileRequest) - .build(); - ContainerCommandResponseProto response = client.sendCommand(request); - validateContainerResponse(response); - } - - /** - * createContainer call that creates a container on the datanode. - * @param client - client - * @param traceID - traceID - * @throws IOException - */ - public static void createContainer(XceiverClientSpi client, String traceID) - throws IOException { - ContainerProtos.CreateContainerRequestProto.Builder createRequest = - ContainerProtos.CreateContainerRequestProto - .newBuilder(); - ContainerProtos.ContainerData.Builder containerData = ContainerProtos - .ContainerData.newBuilder(); - containerData.setName(client.getPipeline().getContainerName()); - createRequest.setPipeline(client.getPipeline().getProtobufMessage()); - createRequest.setContainerData(containerData.build()); - - String id = client.getPipeline().getLeader().getUuidString(); - ContainerCommandRequestProto.Builder request = - ContainerCommandRequestProto.newBuilder(); - request.setCmdType(ContainerProtos.Type.CreateContainer); - request.setCreateContainer(createRequest); - request.setDatanodeUuid(id); - request.setTraceID(traceID); - ContainerCommandResponseProto response = client.sendCommand( - request.build()); - validateContainerResponse(response); - } - - /** - * Deletes a container from a pipeline. - * - * @param client - * @param force whether or not to forcibly delete the container. - * @param traceID - * @throws IOException - */ - public static void deleteContainer(XceiverClientSpi client, - boolean force, String traceID) throws IOException { - ContainerProtos.DeleteContainerRequestProto.Builder deleteRequest = - ContainerProtos.DeleteContainerRequestProto.newBuilder(); - deleteRequest.setName(client.getPipeline().getContainerName()); - deleteRequest.setPipeline(client.getPipeline().getProtobufMessage()); - deleteRequest.setForceDelete(force); - String id = client.getPipeline().getLeader().getUuidString(); - ContainerCommandRequestProto.Builder request = - ContainerCommandRequestProto.newBuilder(); - request.setCmdType(ContainerProtos.Type.DeleteContainer); - request.setDeleteContainer(deleteRequest); - request.setTraceID(traceID); - request.setDatanodeUuid(id); - ContainerCommandResponseProto response = - client.sendCommand(request.build()); - validateContainerResponse(response); - } - - /** - * Close a container. - * - * @param client - * @param traceID - * @throws IOException - */ - public static void closeContainer(XceiverClientSpi client, String traceID) - throws IOException { - ContainerProtos.CloseContainerRequestProto.Builder closeRequest = - ContainerProtos.CloseContainerRequestProto.newBuilder(); - closeRequest.setPipeline(client.getPipeline().getProtobufMessage()); - - String id = client.getPipeline().getLeader().getUuidString(); - ContainerCommandRequestProto.Builder request = - ContainerCommandRequestProto.newBuilder(); - request.setCmdType(Type.CloseContainer); - request.setCloseContainer(closeRequest); - request.setTraceID(traceID); - request.setDatanodeUuid(id); - ContainerCommandResponseProto response = - client.sendCommand(request.build()); - validateContainerResponse(response); - } - - /** - * readContainer call that gets meta data from an existing container. - * - * @param client - client - * @param traceID - trace ID - * @throws IOException - */ - public static ReadContainerResponseProto readContainer( - XceiverClientSpi client, String containerName, - String traceID) throws IOException { - ReadContainerRequestProto.Builder readRequest = - ReadContainerRequestProto.newBuilder(); - readRequest.setName(containerName); - readRequest.setPipeline(client.getPipeline().getProtobufMessage()); - String id = client.getPipeline().getLeader().getUuidString(); - ContainerCommandRequestProto.Builder request = - ContainerCommandRequestProto.newBuilder(); - request.setCmdType(Type.ReadContainer); - request.setReadContainer(readRequest); - request.setDatanodeUuid(id); - request.setTraceID(traceID); - ContainerCommandResponseProto response = - client.sendCommand(request.build()); - validateContainerResponse(response); - return response.getReadContainer(); - } - - /** - * Reads the data given the container name and key. - * - * @param client - * @param containerName - name of the container - * @param key - key - * @param traceID - trace ID - * @return GetSmallFileResponseProto - * @throws IOException - */ - public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client, - String containerName, String key, String traceID) throws IOException { - KeyData containerKeyData = KeyData - .newBuilder() - .setContainerName(containerName) - .setName(key).build(); - - GetKeyRequestProto.Builder getKey = GetKeyRequestProto - .newBuilder() - .setPipeline(client.getPipeline().getProtobufMessage()) - .setKeyData(containerKeyData); - ContainerProtos.GetSmallFileRequestProto getSmallFileRequest = - GetSmallFileRequestProto - .newBuilder().setKey(getKey) - .build(); - String id = client.getPipeline().getLeader().getUuidString(); - ContainerCommandRequestProto request = ContainerCommandRequestProto - .newBuilder() - .setCmdType(Type.GetSmallFile) - .setTraceID(traceID) - .setDatanodeUuid(id) - .setGetSmallFile(getSmallFileRequest) - .build(); - ContainerCommandResponseProto response = client.sendCommand(request); - validateContainerResponse(response); - return response.getGetSmallFile(); - } - - /** - * Validates a response from a container protocol call. Any non-successful - * return code is mapped to a corresponding exception and thrown. - * - * @param response container protocol call response - * @throws IOException if the container protocol call failed - */ - private static void validateContainerResponse( - ContainerCommandResponseProto response - ) throws StorageContainerException { - if (response.getResult() == ContainerProtos.Result.SUCCESS) { - return; - } - throw new StorageContainerException( - response.getMessage(), response.getResult()); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/storage/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/storage/package-info.java b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/storage/package-info.java deleted file mode 100644 index aa89af0..0000000 --- a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/storage/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.scm.storage; - -/** - * This package contains StorageContainerManager classes. - */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundService.java b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundService.java deleted file mode 100644 index 2ff4e55..0000000 --- a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundService.java +++ /dev/null @@ -1,161 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.utils; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.Executors; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Future; -import java.util.concurrent.ExecutionException; - -/** - * An abstract class for a background service in ozone. - * A background service schedules multiple child tasks in parallel - * in a certain period. In each interval, it waits until all the tasks - * finish execution and then schedule next interval. - */ -public abstract class BackgroundService { - - @VisibleForTesting - public static final Logger LOG = - LoggerFactory.getLogger(BackgroundService.class); - - // Executor to launch child tasks - private final ScheduledExecutorService exec; - private final ThreadGroup threadGroup; - private final ThreadFactory threadFactory; - private final String serviceName; - private final long interval; - private final long serviceTimeout; - private final TimeUnit unit; - private final PeriodicalTask service; - - public BackgroundService(String serviceName, long interval, - TimeUnit unit, int threadPoolSize, long serviceTimeout) { - this.interval = interval; - this.unit = unit; - this.serviceName = serviceName; - this.serviceTimeout = serviceTimeout; - threadGroup = new ThreadGroup(serviceName); - ThreadFactory tf = r -> new Thread(threadGroup, r); - threadFactory = new ThreadFactoryBuilder() - .setThreadFactory(tf) - .setDaemon(true) - .setNameFormat(serviceName + "#%d") - .build(); - exec = Executors.newScheduledThreadPool(threadPoolSize, threadFactory); - service = new PeriodicalTask(); - } - - protected ExecutorService getExecutorService() { - return this.exec; - } - - @VisibleForTesting - public int getThreadCount() { - return threadGroup.activeCount(); - } - - @VisibleForTesting - public void triggerBackgroundTaskForTesting() { - service.run(); - } - - // start service - public void start() { - exec.scheduleWithFixedDelay(service, 0, interval, unit); - } - - public abstract BackgroundTaskQueue getTasks(); - - /** - * Run one or more background tasks concurrently. - * Wait until all tasks to return the result. - */ - public class PeriodicalTask implements Runnable { - @Override - public synchronized void run() { - LOG.debug("Running background service : {}", serviceName); - BackgroundTaskQueue tasks = getTasks(); - if (tasks.isEmpty()) { - // No task found, or some problems to init tasks - // return and retry in next interval. - return; - } - - LOG.debug("Number of background tasks to execute : {}", tasks.size()); - CompletionService<BackgroundTaskResult> taskCompletionService = - new ExecutorCompletionService<>(exec); - - List<Future<BackgroundTaskResult>> results = Lists.newArrayList(); - while (tasks.size() > 0) { - BackgroundTask task = tasks.poll(); - Future<BackgroundTaskResult> result = - taskCompletionService.submit(task); - results.add(result); - } - - results.parallelStream().forEach(taskResultFuture -> { - try { - // Collect task results - BackgroundTaskResult result = serviceTimeout > 0 - ? taskResultFuture.get(serviceTimeout, TimeUnit.MILLISECONDS) - : taskResultFuture.get(); - if (LOG.isDebugEnabled()) { - LOG.debug("task execution result size {}", result.getSize()); - } - } catch (InterruptedException | ExecutionException e) { - LOG.warn( - "Background task fails to execute, " - + "retrying in next interval", e); - } catch (TimeoutException e) { - LOG.warn("Background task executes timed out, " - + "retrying in next interval", e); - } - }); - } - } - - // shutdown and make sure all threads are properly released. - public void shutdown() { - LOG.info("Shutting down service {}", this.serviceName); - exec.shutdown(); - try { - if (!exec.awaitTermination(60, TimeUnit.SECONDS)) { - exec.shutdownNow(); - } - } catch (InterruptedException e) { - exec.shutdownNow(); - } - if (threadGroup.activeCount() == 0 && !threadGroup.isDestroyed()) { - threadGroup.destroy(); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundTask.java b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundTask.java deleted file mode 100644 index 47e8ebc..0000000 --- a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundTask.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.utils; - -import java.util.concurrent.Callable; - -/** - * A task thread to run by {@link BackgroundService}. - */ -public interface BackgroundTask<T> extends Callable<T> { - - int getPriority(); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java deleted file mode 100644 index b56ef0c..0000000 --- a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.utils; - -import java.util.PriorityQueue; - -/** - * A priority queue that stores a number of {@link BackgroundTask}. - */ -public class BackgroundTaskQueue { - - private final PriorityQueue<BackgroundTask> tasks; - - public BackgroundTaskQueue() { - tasks = new PriorityQueue<>((task1, task2) - -> task1.getPriority() - task2.getPriority()); - } - - /** - * @return the head task in this queue. - */ - public synchronized BackgroundTask poll() { - return tasks.poll(); - } - - /** - * Add a {@link BackgroundTask} to the queue, - * the task will be sorted by its priority. - * - * @param task - */ - public synchronized void add(BackgroundTask task) { - tasks.add(task); - } - - /** - * @return true if the queue contains no task, false otherwise. - */ - public synchronized boolean isEmpty() { - return tasks.isEmpty(); - } - - /** - * @return the size of the queue. - */ - public synchronized int size() { - return tasks.size(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java deleted file mode 100644 index 198300f..0000000 --- a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.utils; - -/** - * Result of a {@link BackgroundTask}. - */ -public interface BackgroundTaskResult { - - /** - * Returns the size of entries included in this result. - */ - int getSize(); - - /** - * An empty task result implementation. - */ - class EmptyTaskResult implements BackgroundTaskResult { - - public static EmptyTaskResult newResult() { - return new EmptyTaskResult(); - } - - @Override - public int getSize() { - return 0; - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BatchOperation.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BatchOperation.java b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BatchOperation.java deleted file mode 100644 index 47699eb..0000000 --- a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BatchOperation.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.utils; - -import com.google.common.collect.Lists; - -import java.util.List; - -/** - * An utility class to store a batch of DB write operations. - */ -public class BatchOperation { - - /** - * Enum for write operations. - */ - public enum Operation { - DELETE, PUT - } - - private List<SingleOperation> operations = - Lists.newArrayList(); - - /** - * Add a PUT operation into the batch. - */ - public void put(byte[] key, byte[] value) { - operations.add(new SingleOperation(Operation.PUT, key, value)); - } - - /** - * Add a DELETE operation into the batch. - */ - public void delete(byte[] key) { - operations.add(new SingleOperation(Operation.DELETE, key, null)); - - } - - public List<SingleOperation> getOperations() { - return operations; - } - - /** - * A SingleOperation represents a PUT or DELETE operation - * and the data the operation needs to manipulates. - */ - public static class SingleOperation { - - private Operation opt; - private byte[] key; - private byte[] value; - - public SingleOperation(Operation opt, byte[] key, byte[] value) { - this.opt = opt; - if (key == null) { - throw new IllegalArgumentException("key cannot be null"); - } - this.key = key.clone(); - this.value = value == null ? null : value.clone(); - } - - public Operation getOpt() { - return opt; - } - - public byte[] getKey() { - return key.clone(); - } - - public byte[] getValue() { - return value == null ? null : value.clone(); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/EntryConsumer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/EntryConsumer.java b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/EntryConsumer.java deleted file mode 100644 index c407398..0000000 --- a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/EntryConsumer.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.utils; - -import java.io.IOException; - -/** - * A consumer for metadata store key-value entries. - * Used by {@link MetadataStore} class. - */ -@FunctionalInterface -public interface EntryConsumer { - - /** - * Consumes a key and value and produces a boolean result. - * @param key key - * @param value value - * @return a boolean value produced by the consumer - * @throws IOException - */ - boolean consume(byte[] key, byte[] value) throws IOException; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java deleted file mode 100644 index 72ac8d1..0000000 --- a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java +++ /dev/null @@ -1,380 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.utils; - -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter; -import org.fusesource.leveldbjni.JniDBFactory; -import org.iq80.leveldb.WriteBatch; -import org.iq80.leveldb.DB; -import org.iq80.leveldb.Options; -import org.iq80.leveldb.WriteOptions; -import org.iq80.leveldb.DBIterator; -import org.iq80.leveldb.Snapshot; -import org.iq80.leveldb.ReadOptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Map; -import java.util.Map.Entry; - -/** - * LevelDB interface. - */ -public class LevelDBStore implements MetadataStore { - - private static final Logger LOG = - LoggerFactory.getLogger(LevelDBStore.class); - - private DB db; - private final File dbFile; - private final Options dbOptions; - private final WriteOptions writeOptions; - - public LevelDBStore(File dbPath, boolean createIfMissing) - throws IOException { - dbOptions = new Options(); - dbOptions.createIfMissing(createIfMissing); - this.dbFile = dbPath; - this.writeOptions = new WriteOptions().sync(true); - openDB(dbPath, dbOptions); - } - - /** - * Opens a DB file. - * - * @param dbPath - DB File path - * @throws IOException - */ - public LevelDBStore(File dbPath, Options options) - throws IOException { - dbOptions = options; - this.dbFile = dbPath; - this.writeOptions = new WriteOptions().sync(true); - openDB(dbPath, dbOptions); - } - - private void openDB(File dbPath, Options options) throws IOException { - dbPath.getParentFile().mkdirs(); - db = JniDBFactory.factory.open(dbPath, options); - if (LOG.isDebugEnabled()) { - LOG.debug("LevelDB successfully opened"); - LOG.debug("[Option] cacheSize = " + options.cacheSize()); - LOG.debug("[Option] createIfMissing = " + options.createIfMissing()); - LOG.debug("[Option] blockSize = " + options.blockSize()); - LOG.debug("[Option] compressionType= " + options.compressionType()); - LOG.debug("[Option] maxOpenFiles= " + options.maxOpenFiles()); - LOG.debug("[Option] writeBufferSize= "+ options.writeBufferSize()); - } - } - - /** - * Puts a Key into file. - * - * @param key - key - * @param value - value - */ - @Override - public void put(byte[] key, byte[] value) { - db.put(key, value, writeOptions); - } - - /** - * Get Key. - * - * @param key key - * @return value - */ - @Override - public byte[] get(byte[] key) { - return db.get(key); - } - - /** - * Delete Key. - * - * @param key - Key - */ - @Override - public void delete(byte[] key) { - db.delete(key); - } - - /** - * Closes the DB. - * - * @throws IOException - */ - @Override - public void close() throws IOException { - if (db != null){ - db.close(); - } - } - - /** - * Returns true if the DB is empty. - * - * @return boolean - * @throws IOException - */ - @Override - public boolean isEmpty() throws IOException { - try (DBIterator iter = db.iterator()) { - iter.seekToFirst(); - boolean hasNext = !iter.hasNext(); - return hasNext; - } - } - - /** - * Returns the actual levelDB object. - * @return DB handle. - */ - public DB getDB() { - return db; - } - - /** - * Returns an iterator on all the key-value pairs in the DB. - * @return an iterator on DB entries. - */ - public DBIterator getIterator() { - return db.iterator(); - } - - - @Override - public void destroy() throws IOException { - close(); - JniDBFactory.factory.destroy(dbFile, dbOptions); - } - - @Override - public ImmutablePair<byte[], byte[]> peekAround(int offset, - byte[] from) throws IOException, IllegalArgumentException { - try (DBIterator it = db.iterator()) { - if (from == null) { - it.seekToFirst(); - } else { - it.seek(from); - } - if (!it.hasNext()) { - return null; - } - switch (offset) { - case 0: - Entry<byte[], byte[]> current = it.next(); - return new ImmutablePair<>(current.getKey(), current.getValue()); - case 1: - if (it.next() != null && it.hasNext()) { - Entry<byte[], byte[]> next = it.peekNext(); - return new ImmutablePair<>(next.getKey(), next.getValue()); - } - break; - case -1: - if (it.hasPrev()) { - Entry<byte[], byte[]> prev = it.peekPrev(); - return new ImmutablePair<>(prev.getKey(), prev.getValue()); - } - break; - default: - throw new IllegalArgumentException( - "Position can only be -1, 0 " + "or 1, but found " + offset); - } - } - return null; - } - - @Override - public void iterate(byte[] from, EntryConsumer consumer) - throws IOException { - try (DBIterator iter = db.iterator()) { - if (from != null) { - iter.seek(from); - } else { - iter.seekToFirst(); - } - while (iter.hasNext()) { - Entry<byte[], byte[]> current = iter.next(); - if (!consumer.consume(current.getKey(), - current.getValue())) { - break; - } - } - } - } - - /** - * Compacts the DB by removing deleted keys etc. - * @throws IOException if there is an error. - */ - @Override - public void compactDB() throws IOException { - if(db != null) { - // From LevelDB docs : begin == null and end == null means the whole DB. - db.compactRange(null, null); - } - } - - @Override - public void writeBatch(BatchOperation operation) throws IOException { - List<BatchOperation.SingleOperation> operations = - operation.getOperations(); - if (!operations.isEmpty()) { - try (WriteBatch writeBatch = db.createWriteBatch()) { - for (BatchOperation.SingleOperation opt : operations) { - switch (opt.getOpt()) { - case DELETE: - writeBatch.delete(opt.getKey()); - break; - case PUT: - writeBatch.put(opt.getKey(), opt.getValue()); - break; - default: - throw new IllegalArgumentException("Invalid operation " - + opt.getOpt()); - } - } - db.write(writeBatch); - } - } - } - - @Override - public List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey, - int count, MetadataKeyFilters.MetadataKeyFilter... filters) - throws IOException, IllegalArgumentException { - return getRangeKVs(startKey, count, false, filters); - } - - @Override - public List<Map.Entry<byte[], byte[]>> getSequentialRangeKVs(byte[] startKey, - int count, MetadataKeyFilters.MetadataKeyFilter... filters) - throws IOException, IllegalArgumentException { - return getRangeKVs(startKey, count, true, filters); - } - - /** - * Returns a certain range of key value pairs as a list based on a - * startKey or count. Further a {@link MetadataKeyFilter} can be added to - * filter keys if necessary. To prevent race conditions while listing - * entries, this implementation takes a snapshot and lists the entries from - * the snapshot. This may, on the other hand, cause the range result slight - * different with actual data if data is updating concurrently. - * <p> - * If the startKey is specified and found in levelDB, this key and the keys - * after this key will be included in the result. If the startKey is null - * all entries will be included as long as other conditions are satisfied. - * If the given startKey doesn't exist, an empty list will be returned. - * <p> - * The count argument is to limit number of total entries to return, - * the value for count must be an integer greater than 0. - * <p> - * This method allows to specify one or more {@link MetadataKeyFilter} - * to filter keys by certain condition. Once given, only the entries - * whose key passes all the filters will be included in the result. - * - * @param startKey a start key. - * @param count max number of entries to return. - * @param filters customized one or more {@link MetadataKeyFilter}. - * @return a list of entries found in the database or an empty list if the - * startKey is invalid. - * @throws IOException if there are I/O errors. - * @throws IllegalArgumentException if count is less than 0. - */ - private List<Entry<byte[], byte[]>> getRangeKVs(byte[] startKey, - int count, boolean sequential, MetadataKeyFilter... filters) - throws IOException { - List<Entry<byte[], byte[]>> result = new ArrayList<>(); - long start = System.currentTimeMillis(); - if (count < 0) { - throw new IllegalArgumentException( - "Invalid count given " + count + ", count must be greater than 0"); - } - Snapshot snapShot = null; - DBIterator dbIter = null; - try { - snapShot = db.getSnapshot(); - ReadOptions readOptions = new ReadOptions().snapshot(snapShot); - dbIter = db.iterator(readOptions); - if (startKey == null) { - dbIter.seekToFirst(); - } else { - if (db.get(startKey) == null) { - // Key not found, return empty list - return result; - } - dbIter.seek(startKey); - } - while (dbIter.hasNext() && result.size() < count) { - byte[] preKey = dbIter.hasPrev() ? dbIter.peekPrev().getKey() : null; - byte[] nextKey = dbIter.hasNext() ? dbIter.peekNext().getKey() : null; - Entry<byte[], byte[]> current = dbIter.next(); - - if (filters == null) { - result.add(current); - } else { - if (Arrays.asList(filters).stream().allMatch( - entry -> entry.filterKey(preKey, current.getKey(), nextKey))) { - result.add(current); - } else { - if (result.size() > 0 && sequential) { - // if the caller asks for a sequential range of results, - // and we met a dis-match, abort iteration from here. - // if result is empty, we continue to look for the first match. - break; - } - } - } - } - } finally { - if (snapShot != null) { - snapShot.close(); - } - if (dbIter != null) { - dbIter.close(); - } - if (LOG.isDebugEnabled()) { - if (filters != null) { - for (MetadataKeyFilters.MetadataKeyFilter filter : filters) { - int scanned = filter.getKeysScannedNum(); - int hinted = filter.getKeysHintedNum(); - if (scanned > 0 || hinted > 0) { - LOG.debug( - "getRangeKVs ({}) numOfKeysScanned={}, numOfKeysHinted={}", - filter.getClass().getSimpleName(), filter.getKeysScannedNum(), - filter.getKeysHintedNum()); - } - } - } - long end = System.currentTimeMillis(); - long timeConsumed = end - start; - LOG.debug("Time consumed for getRangeKVs() is {}ms," - + " result length is {}.", timeConsumed, result.size()); - } - } - return result; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java deleted file mode 100644 index 3ff0a94..0000000 --- a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.utils; - -import com.google.common.base.Strings; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.ozone.OzoneConsts; - -/** - * An utility class to filter levelDB keys. - */ -public final class MetadataKeyFilters { - - private static KeyPrefixFilter deletingKeyFilter = - new MetadataKeyFilters.KeyPrefixFilter(OzoneConsts.DELETING_KEY_PREFIX); - - private static KeyPrefixFilter normalKeyFilter = - new MetadataKeyFilters.KeyPrefixFilter(OzoneConsts.DELETING_KEY_PREFIX, - true); - - private MetadataKeyFilters() { - } - - public static KeyPrefixFilter getDeletingKeyFilter() { - return deletingKeyFilter; - } - - public static KeyPrefixFilter getNormalKeyFilter() { - return normalKeyFilter; - } - /** - * Interface for levelDB key filters. - */ - public interface MetadataKeyFilter { - /** - * Filter levelDB key with a certain condition. - * - * @param preKey previous key. - * @param currentKey current key. - * @param nextKey next key. - * @return true if a certain condition satisfied, return false otherwise. - */ - boolean filterKey(byte[] preKey, byte[] currentKey, byte[] nextKey); - - default int getKeysScannedNum() { - return 0; - } - - default int getKeysHintedNum() { - return 0; - } - } - - /** - * Utility class to filter key by a string prefix. This filter - * assumes keys can be parsed to a string. - */ - public static class KeyPrefixFilter implements MetadataKeyFilter { - - private String keyPrefix = null; - private int keysScanned = 0; - private int keysHinted = 0; - private Boolean negative; - - public KeyPrefixFilter(String keyPrefix) { - this(keyPrefix, false); - } - - public KeyPrefixFilter(String keyPrefix, boolean negative) { - this.keyPrefix = keyPrefix; - this.negative = negative; - } - - @Override - public boolean filterKey(byte[] preKey, byte[] currentKey, - byte[] nextKey) { - keysScanned++; - boolean accept = false; - if (Strings.isNullOrEmpty(keyPrefix)) { - accept = true; - } else { - if (currentKey != null && - DFSUtil.bytes2String(currentKey).startsWith(keyPrefix)) { - keysHinted++; - accept = true; - } else { - accept = false; - } - } - return (negative) ? !accept : accept; - } - - @Override - public int getKeysScannedNum() { - return keysScanned; - } - - @Override - public int getKeysHintedNum() { - return keysHinted; - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/MetadataStore.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/MetadataStore.java b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/MetadataStore.java deleted file mode 100644 index b90b08f..0000000 --- a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/MetadataStore.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.utils; - -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; -import java.util.Map; - -/** - * Interface for key-value store that stores ozone metadata. - * Ozone metadata is stored as key value pairs, both key and value - * are arbitrary byte arrays. - */ -@InterfaceStability.Evolving -public interface MetadataStore extends Closeable{ - - /** - * Puts a key-value pair into the store. - * - * @param key metadata key - * @param value metadata value - */ - void put(byte[] key, byte[] value) throws IOException; - - /** - * @return true if the metadata store is empty. - * - * @throws IOException - */ - boolean isEmpty() throws IOException; - - /** - * Returns the value mapped to the given key in byte array. - * - * @param key metadata key - * @return value in byte array - * @throws IOException - */ - byte[] get(byte[] key) throws IOException; - - /** - * Deletes a key from the metadata store. - * - * @param key metadata key - * @throws IOException - */ - void delete(byte[] key) throws IOException; - - /** - * Returns a certain range of key value pairs as a list based on a - * startKey or count. Further a {@link MetadataKeyFilter} can be added to - * filter keys if necessary. To prevent race conditions while listing - * entries, this implementation takes a snapshot and lists the entries from - * the snapshot. This may, on the other hand, cause the range result slight - * different with actual data if data is updating concurrently. - * <p> - * If the startKey is specified and found in levelDB, this key and the keys - * after this key will be included in the result. If the startKey is null - * all entries will be included as long as other conditions are satisfied. - * If the given startKey doesn't exist and empty list will be returned. - * <p> - * The count argument is to limit number of total entries to return, - * the value for count must be an integer greater than 0. - * <p> - * This method allows to specify one or more {@link MetadataKeyFilter} - * to filter keys by certain condition. Once given, only the entries - * whose key passes all the filters will be included in the result. - * - * @param startKey a start key. - * @param count max number of entries to return. - * @param filters customized one or more {@link MetadataKeyFilter}. - * @return a list of entries found in the database or an empty list if the - * startKey is invalid. - * @throws IOException if there are I/O errors. - * @throws IllegalArgumentException if count is less than 0. - */ - List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey, - int count, MetadataKeyFilter... filters) - throws IOException, IllegalArgumentException; - - /** - * This method is very similar to {@link #getRangeKVs}, the only - * different is this method is supposed to return a sequential range - * of elements based on the filters. While iterating the elements, - * if it met any entry that cannot pass the filter, the iterator will stop - * from this point without looking for next match. If no filter is given, - * this method behaves just like {@link #getRangeKVs}. - * - * @param startKey a start key. - * @param count max number of entries to return. - * @param filters customized one or more {@link MetadataKeyFilter}. - * @return a list of entries found in the database. - * @throws IOException - * @throws IllegalArgumentException - */ - List<Map.Entry<byte[], byte[]>> getSequentialRangeKVs(byte[] startKey, - int count, MetadataKeyFilter... filters) - throws IOException, IllegalArgumentException; - - /** - * A batch of PUT, DELETE operations handled as a single atomic write. - * - * @throws IOException write fails - */ - void writeBatch(BatchOperation operation) throws IOException; - - /** - * Compact the entire database. - * @throws IOException - */ - void compactDB() throws IOException; - - /** - * Destroy the content of the specified database, - * a destroyed database will not be able to load again. - * Be very careful with this method. - * - * @throws IOException if I/O error happens - */ - void destroy() throws IOException; - - /** - * Seek the database to a certain key, returns the key-value - * pairs around this key based on the given offset. Note, this method - * can only support offset -1 (left), 0 (current) and 1 (right), - * any other offset given will cause a {@link IllegalArgumentException}. - * - * @param offset offset to the key - * @param from from which key - * @return a key-value pair - * @throws IOException - */ - ImmutablePair<byte[], byte[]> peekAround(int offset, byte[] from) - throws IOException, IllegalArgumentException; - - /** - * Iterates entries in the database from a certain key. - * Applies the given {@link EntryConsumer} to the key and value of - * each entry, the function produces a boolean result which is used - * as the criteria to exit from iteration. - * - * @param from the start key - * @param consumer - * a {@link EntryConsumer} applied to each key and value. If the consumer - * returns true, continues the iteration to next entry; otherwise exits - * the iteration. - * @throws IOException - */ - void iterate(byte[] from, EntryConsumer consumer) - throws IOException; -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org