http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java new file mode 100644 index 0000000..0d4a299 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.client; + +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.ContainerData; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; + +/** + * The interface to call into underlying container layer. + * + * Written as interface to allow easy testing: implement a mock container layer + * for standalone testing of CBlock API without actually calling into remote + * containers. Actual container layer can simply re-implement this. + * + * NOTE this is temporarily needed class. When SCM containers are full-fledged, + * this interface will likely be removed. + */ +@InterfaceStability.Unstable +public interface ScmClient { + /** + * Creates a Container on SCM and returns the pipeline. + * @param containerId - String container ID + * @return Pipeline + * @throws IOException + */ + Pipeline createContainer(String containerId, String owner) throws IOException; + + /** + * Gets a container by Name -- Throws if the container does not exist. + * @param containerId - String Container ID + * @return Pipeline + * @throws IOException + */ + Pipeline getContainer(String containerId) throws IOException; + + /** + * Close a container by name. + * + * @param pipeline the container to be closed. + * @throws IOException + */ + void closeContainer(Pipeline pipeline) throws IOException; + + /** + * Deletes an existing container. + * @param pipeline - Pipeline that represents the container. + * @param force - true to forcibly delete the container. + * @throws IOException + */ + void deleteContainer(Pipeline pipeline, boolean force) throws IOException; + + /** + * Lists a range of containers and get their info. + * + * @param startName start name, if null, start searching at the head. + * @param prefixName prefix name, if null, then filter is disabled. + * @param count count, if count < 0, the max size is unlimited.( + * Usually the count will be replace with a very big + * value instead of being unlimited in case the db is very big) + * + * @return a list of pipeline. + * @throws IOException + */ + List<ContainerInfo> listContainer(String startName, String prefixName, + int count) throws IOException; + + /** + * Read meta data from an existing container. + * @param pipeline - Pipeline that represents the container. + * @return ContainerInfo + * @throws IOException + */ + ContainerData readContainer(Pipeline pipeline) throws IOException; + + + /** + * Gets the container size -- Computed by SCM from Container Reports. + * @param pipeline - Pipeline + * @return number of bytes used by this container. + * @throws IOException + */ + long getContainerSize(Pipeline pipeline) throws IOException; + + /** + * Creates a Container on SCM and returns the pipeline. + * @param type - Replication Type. + * @param replicationFactor - Replication Factor + * @param containerId - Container ID + * @return Pipeline + * @throws IOException - in case of error. + */ + Pipeline createContainer(HddsProtos.ReplicationType type, + HddsProtos.ReplicationFactor replicationFactor, String containerId, + String owner) throws IOException; + + /** + * Returns a set of Nodes that meet a query criteria. + * @param nodeStatuses - A set of criteria that we want the node to have. + * @param queryScope - Query scope - Cluster or pool. + * @param poolName - if it is pool, a pool name is required. + * @return A set of nodes that meet the requested criteria. + * @throws IOException + */ + HddsProtos.NodePool queryNode(EnumSet<HddsProtos.NodeState> nodeStatuses, + HddsProtos.QueryScope queryScope, String poolName) throws IOException; + + /** + * Creates a specified replication pipeline. + * @param type - Type + * @param factor - Replication factor + * @param nodePool - Set of machines. + * @throws IOException + */ + Pipeline createReplicationPipeline(HddsProtos.ReplicationType type, + HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool) + throws IOException; +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java new file mode 100644 index 0000000..e2f7033 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.client; + +/** + * This package contains classes for the client of the storage container + * protocol. + */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java new file mode 100644 index 0000000..9520c8c --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.container; + +import com.google.common.base.Preconditions; +import org.apache.commons.math3.util.MathUtils; + +/** + * Container ID is an integer that is a value between 1..MAX_CONTAINER ID. + * <p> + * We are creating a specific type for this to avoid mixing this with + * normal integers in code. + */ +public class ContainerID implements Comparable { + + private final long id; + + /** + * Constructs ContainerID. + * + * @param id int + */ + public ContainerID(long id) { + Preconditions.checkState(id > 0, + "Container ID should be a positive int"); + this.id = id; + } + + /** + * Factory method for creation of ContainerID. + * @param containerID long + * @return ContainerID. + */ + public static ContainerID valueof(long containerID) { + Preconditions.checkState(containerID > 0); + return new ContainerID(containerID); + } + + /** + * Returns int representation of ID. + * + * @return int + */ + public long getId() { + return id; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ContainerID that = (ContainerID) o; + + return id == that.id; + } + + @Override + public int hashCode() { + return MathUtils.hash(id); + } + + @Override + public int compareTo(Object o) { + Preconditions.checkNotNull(o); + if (o instanceof ContainerID) { + return Long.compare(((ContainerID) o).getId(), this.getId()); + } + throw new IllegalArgumentException("Object O, should be an instance " + + "of ContainerID"); + } + + @Override + public String toString() { + return "id=" + id; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java new file mode 100644 index 0000000..d253b15 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.common.helpers; + +/** + * Allocated block wraps the result returned from SCM#allocateBlock which + * contains a Pipeline and the key. + */ +public final class AllocatedBlock { + private Pipeline pipeline; + private String key; + // Indicates whether the client should create container before writing block. + private boolean shouldCreateContainer; + + /** + * Builder for AllocatedBlock. + */ + public static class Builder { + private Pipeline pipeline; + private String key; + private boolean shouldCreateContainer; + + public Builder setPipeline(Pipeline p) { + this.pipeline = p; + return this; + } + + public Builder setKey(String k) { + this.key = k; + return this; + } + + public Builder setShouldCreateContainer(boolean shouldCreate) { + this.shouldCreateContainer = shouldCreate; + return this; + } + + public AllocatedBlock build() { + return new AllocatedBlock(pipeline, key, shouldCreateContainer); + } + } + + private AllocatedBlock(Pipeline pipeline, String key, + boolean shouldCreateContainer) { + this.pipeline = pipeline; + this.key = key; + this.shouldCreateContainer = shouldCreateContainer; + } + + public Pipeline getPipeline() { + return pipeline; + } + + public String getKey() { + return key; + } + + public boolean getCreateContainer() { + return shouldCreateContainer; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java new file mode 100644 index 0000000..823a7fb --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.common.helpers; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.util.Time; + +import java.util.Comparator; + +/** + * Class wraps ozone container info. + */ +public class ContainerInfo + implements Comparator<ContainerInfo>, Comparable<ContainerInfo> { + private HddsProtos.LifeCycleState state; + private Pipeline pipeline; + // Bytes allocated by SCM for clients. + private long allocatedBytes; + // Actual container usage, updated through heartbeat. + private long usedBytes; + private long numberOfKeys; + private long lastUsed; + // The wall-clock ms since the epoch at which the current state enters. + private long stateEnterTime; + private String owner; + private String containerName; + private long containerID; + ContainerInfo( + long containerID, + final String containerName, + HddsProtos.LifeCycleState state, + Pipeline pipeline, + long allocatedBytes, + long usedBytes, + long numberOfKeys, + long stateEnterTime, + String owner) { + this.containerID = containerID; + this.containerName = containerName; + this.pipeline = pipeline; + this.allocatedBytes = allocatedBytes; + this.usedBytes = usedBytes; + this.numberOfKeys = numberOfKeys; + this.lastUsed = Time.monotonicNow(); + this.state = state; + this.stateEnterTime = stateEnterTime; + this.owner = owner; + } + + /** + * Needed for serialization findbugs. + */ + public ContainerInfo() { + } + + public static ContainerInfo fromProtobuf(HddsProtos.SCMContainerInfo info) { + ContainerInfo.Builder builder = new ContainerInfo.Builder(); + builder.setPipeline(Pipeline.getFromProtoBuf(info.getPipeline())); + builder.setAllocatedBytes(info.getAllocatedBytes()); + builder.setUsedBytes(info.getUsedBytes()); + builder.setNumberOfKeys(info.getNumberOfKeys()); + builder.setState(info.getState()); + builder.setStateEnterTime(info.getStateEnterTime()); + builder.setOwner(info.getOwner()); + builder.setContainerName(info.getContainerName()); + builder.setContainerID(info.getContainerID()); + return builder.build(); + } + + public long getContainerID() { + return containerID; + } + + public String getContainerName() { + return containerName; + } + + public HddsProtos.LifeCycleState getState() { + return state; + } + + public void setState(HddsProtos.LifeCycleState state) { + this.state = state; + } + + public long getStateEnterTime() { + return stateEnterTime; + } + + public Pipeline getPipeline() { + return pipeline; + } + + public long getAllocatedBytes() { + return allocatedBytes; + } + + /** + * Set Allocated bytes. + * + * @param size - newly allocated bytes -- negative size is case of deletes + * can be used. + */ + public void updateAllocatedBytes(long size) { + this.allocatedBytes += size; + } + + public long getUsedBytes() { + return usedBytes; + } + + public long getNumberOfKeys() { + return numberOfKeys; + } + + public ContainerID containerID() { + return new ContainerID(getContainerID()); + } + + /** + * Gets the last used time from SCM's perspective. + * + * @return time in milliseconds. + */ + public long getLastUsed() { + return lastUsed; + } + + public void updateLastUsedTime() { + lastUsed = Time.monotonicNow(); + } + + public void allocate(long size) { + // should we also have total container size in ContainerInfo + // and check before allocating? + allocatedBytes += size; + } + + public HddsProtos.SCMContainerInfo getProtobuf() { + HddsProtos.SCMContainerInfo.Builder builder = + HddsProtos.SCMContainerInfo.newBuilder(); + builder.setPipeline(getPipeline().getProtobufMessage()); + builder.setAllocatedBytes(getAllocatedBytes()); + builder.setUsedBytes(getUsedBytes()); + builder.setNumberOfKeys(getNumberOfKeys()); + builder.setState(state); + builder.setStateEnterTime(stateEnterTime); + builder.setContainerID(getContainerID()); + + if (getOwner() != null) { + builder.setOwner(getOwner()); + } + builder.setContainerName(getContainerName()); + return builder.build(); + } + + public String getOwner() { + return owner; + } + + public void setOwner(String owner) { + this.owner = owner; + } + + @Override + public String toString() { + return "ContainerInfo{" + + "state=" + state + + ", pipeline=" + pipeline + + ", stateEnterTime=" + stateEnterTime + + ", owner=" + owner + + ", containerName='" + containerName + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + ContainerInfo that = (ContainerInfo) o; + + return new EqualsBuilder() + .append(pipeline.getContainerName(), that.pipeline.getContainerName()) + + // TODO : Fix this later. If we add these factors some tests fail. + // So Commenting this to continue and will enforce this with + // Changes in pipeline where we remove Container Name to + // SCMContainerinfo from Pipeline. + // .append(pipeline.getFactor(), that.pipeline.getFactor()) + // .append(pipeline.getType(), that.pipeline.getType()) + .append(owner, that.owner) + .isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(11, 811) + .append(pipeline.getContainerName()) + .append(pipeline.getFactor()) + .append(pipeline.getType()) + .append(owner) + .toHashCode(); + } + + /** + * Compares its two arguments for order. Returns a negative integer, zero, or + * a positive integer as the first argument is less than, equal to, or greater + * than the second.<p> + * + * @param o1 the first object to be compared. + * @param o2 the second object to be compared. + * @return a negative integer, zero, or a positive integer as the first + * argument is less than, equal to, or greater than the second. + * @throws NullPointerException if an argument is null and this comparator + * does not permit null arguments + * @throws ClassCastException if the arguments' types prevent them from + * being compared by this comparator. + */ + @Override + public int compare(ContainerInfo o1, ContainerInfo o2) { + return Long.compare(o1.getLastUsed(), o2.getLastUsed()); + } + + /** + * Compares this object with the specified object for order. Returns a + * negative integer, zero, or a positive integer as this object is less than, + * equal to, or greater than the specified object. + * + * @param o the object to be compared. + * @return a negative integer, zero, or a positive integer as this object is + * less than, equal to, or greater than the specified object. + * @throws NullPointerException if the specified object is null + * @throws ClassCastException if the specified object's type prevents it + * from being compared to this object. + */ + @Override + public int compareTo(ContainerInfo o) { + return this.compare(this, o); + } + + /** + * Builder class for ContainerInfo. + */ + public static class Builder { + private HddsProtos.LifeCycleState state; + private Pipeline pipeline; + private long allocated; + private long used; + private long keys; + private long stateEnterTime; + private String owner; + private String containerName; + private long containerID; + + public Builder setContainerID(long id) { + Preconditions.checkState(id >= 0); + this.containerID = id; + return this; + } + + public Builder setState(HddsProtos.LifeCycleState lifeCycleState) { + this.state = lifeCycleState; + return this; + } + + public Builder setPipeline(Pipeline containerPipeline) { + this.pipeline = containerPipeline; + return this; + } + + public Builder setAllocatedBytes(long bytesAllocated) { + this.allocated = bytesAllocated; + return this; + } + + public Builder setUsedBytes(long bytesUsed) { + this.used = bytesUsed; + return this; + } + + public Builder setNumberOfKeys(long keyCount) { + this.keys = keyCount; + return this; + } + + public Builder setStateEnterTime(long time) { + this.stateEnterTime = time; + return this; + } + + public Builder setOwner(String containerOwner) { + this.owner = containerOwner; + return this; + } + + public Builder setContainerName(String container) { + this.containerName = container; + return this; + } + + public ContainerInfo build() { + return new + ContainerInfo(containerID, containerName, state, pipeline, + allocated, used, keys, stateEnterTime, owner); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/DeleteBlockResult.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/DeleteBlockResult.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/DeleteBlockResult.java new file mode 100644 index 0000000..fd97eae --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/DeleteBlockResult.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container.common.helpers; + +import static org.apache.hadoop.hdds.protocol.proto + .ScmBlockLocationProtocolProtos.DeleteScmBlockResult; + +/** + * Class wraps storage container manager block deletion results. + */ +public class DeleteBlockResult { + private String key; + private DeleteScmBlockResult.Result result; + + public DeleteBlockResult(final String key, + final DeleteScmBlockResult.Result result) { + this.key = key; + this.result = result; + } + + /** + * Get key deleted. + * @return key name. + */ + public String getKey() { + return key; + } + + /** + * Get key deletion result. + * @return key deletion result. + */ + public DeleteScmBlockResult.Result getResult() { + return result; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java new file mode 100644 index 0000000..32d0a2d --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.common.helpers; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonFilter; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.ser.FilterProvider; +import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter; +import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * A pipeline represents the group of machines over which a container lives. + */ +public class Pipeline { + static final String PIPELINE_INFO = "PIPELINE_INFO_FILTER"; + private static final ObjectWriter WRITER; + + static { + ObjectMapper mapper = new ObjectMapper(); + String[] ignorableFieldNames = {"data"}; + FilterProvider filters = new SimpleFilterProvider() + .addFilter(PIPELINE_INFO, SimpleBeanPropertyFilter + .serializeAllExcept(ignorableFieldNames)); + mapper.setVisibility(PropertyAccessor.FIELD, + JsonAutoDetect.Visibility.ANY); + mapper.addMixIn(Object.class, MixIn.class); + + WRITER = mapper.writer(filters); + } + + private String containerName; + private PipelineChannel pipelineChannel; + /** + * Allows you to maintain private data on pipelines. This is not serialized + * via protobuf, just allows us to maintain some private data. + */ + @JsonIgnore + private byte[] data; + /** + * Constructs a new pipeline data structure. + * + * @param containerName - Container + * @param pipelineChannel - transport information for this container + */ + public Pipeline(String containerName, PipelineChannel pipelineChannel) { + this.containerName = containerName; + this.pipelineChannel = pipelineChannel; + data = null; + } + + /** + * Gets pipeline object from protobuf. + * + * @param pipeline - ProtoBuf definition for the pipeline. + * @return Pipeline Object + */ + public static Pipeline getFromProtoBuf(HddsProtos.Pipeline pipeline) { + Preconditions.checkNotNull(pipeline); + PipelineChannel pipelineChannel = + PipelineChannel.getFromProtoBuf(pipeline.getPipelineChannel()); + return new Pipeline(pipeline.getContainerName(), pipelineChannel); + } + + public HddsProtos.ReplicationFactor getFactor() { + return pipelineChannel.getFactor(); + } + + /** + * Returns the first machine in the set of datanodes. + * + * @return First Machine. + */ + @JsonIgnore + public DatanodeDetails getLeader() { + return pipelineChannel.getDatanodes().get(pipelineChannel.getLeaderID()); + } + + /** + * Returns the leader host. + * + * @return First Machine. + */ + public String getLeaderHost() { + return pipelineChannel.getDatanodes() + .get(pipelineChannel.getLeaderID()).getHostName(); + } + + /** + * Returns all machines that make up this pipeline. + * + * @return List of Machines. + */ + @JsonIgnore + public List<DatanodeDetails> getMachines() { + return new ArrayList<>(pipelineChannel.getDatanodes().values()); + } + + /** + * Returns all machines that make up this pipeline. + * + * @return List of Machines. + */ + public List<String> getDatanodeHosts() { + List<String> dataHosts = new ArrayList<>(); + for (DatanodeDetails id : pipelineChannel.getDatanodes().values()) { + dataHosts.add(id.getHostName()); + } + return dataHosts; + } + + /** + * Return a Protobuf Pipeline message from pipeline. + * + * @return Protobuf message + */ + @JsonIgnore + public HddsProtos.Pipeline getProtobufMessage() { + HddsProtos.Pipeline.Builder builder = + HddsProtos.Pipeline.newBuilder(); + builder.setContainerName(this.containerName); + builder.setPipelineChannel(this.pipelineChannel.getProtobufMessage()); + return builder.build(); + } + + /** + * Returns containerName if available. + * + * @return String. + */ + public String getContainerName() { + return containerName; + } + + /** + * Returns private data that is set on this pipeline. + * + * @return blob, the user can interpret it any way they like. + */ + public byte[] getData() { + if (this.data != null) { + return Arrays.copyOf(this.data, this.data.length); + } else { + return null; + } + } + + @VisibleForTesting + public PipelineChannel getPipelineChannel() { + return pipelineChannel; + } + + /** + * Set private data on pipeline. + * + * @param data -- private data. + */ + public void setData(byte[] data) { + if (data != null) { + this.data = Arrays.copyOf(data, data.length); + } + } + + /** + * Gets the State of the pipeline. + * + * @return - LifeCycleStates. + */ + public HddsProtos.LifeCycleState getLifeCycleState() { + return pipelineChannel.getLifeCycleState(); + } + + /** + * Gets the pipeline Name. + * + * @return - Name of the pipeline + */ + public String getPipelineName() { + return pipelineChannel.getName(); + } + + /** + * Returns the type. + * + * @return type - Standalone, Ratis, Chained. + */ + public HddsProtos.ReplicationType getType() { + return pipelineChannel.getType(); + } + + @Override + public String toString() { + final StringBuilder b = new StringBuilder(getClass().getSimpleName()) + .append("["); + pipelineChannel.getDatanodes().keySet().stream() + .forEach(id -> b. + append(id.endsWith(pipelineChannel.getLeaderID()) ? "*" + id : id)); + b.append("] container:").append(containerName); + b.append(" name:").append(getPipelineName()); + if (getType() != null) { + b.append(" type:").append(getType().toString()); + } + if (getFactor() != null) { + b.append(" factor:").append(getFactor().toString()); + } + if (getLifeCycleState() != null) { + b.append(" State:").append(getLifeCycleState().toString()); + } + return b.toString(); + } + + /** + * Returns a JSON string of this object. + * + * @return String - json string + * @throws IOException + */ + public String toJsonString() throws IOException { + return WRITER.writeValueAsString(this); + } + + @JsonFilter(PIPELINE_INFO) + class MixIn { + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineChannel.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineChannel.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineChannel.java new file mode 100644 index 0000000..ebd52e9 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineChannel.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container.common.helpers; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; + +import java.util.Map; +import java.util.TreeMap; + +/** + * PipelineChannel information for a {@link Pipeline}. + */ +public class PipelineChannel { + @JsonIgnore + private String leaderID; + @JsonIgnore + private Map<String, DatanodeDetails> datanodes; + private LifeCycleState lifeCycleState; + private ReplicationType type; + private ReplicationFactor factor; + private String name; + + public PipelineChannel(String leaderID, LifeCycleState lifeCycleState, + ReplicationType replicationType, ReplicationFactor replicationFactor, + String name) { + this.leaderID = leaderID; + this.lifeCycleState = lifeCycleState; + this.type = replicationType; + this.factor = replicationFactor; + this.name = name; + datanodes = new TreeMap<>(); + } + + public String getLeaderID() { + return leaderID; + } + + public Map<String, DatanodeDetails> getDatanodes() { + return datanodes; + } + + public LifeCycleState getLifeCycleState() { + return lifeCycleState; + } + + public ReplicationType getType() { + return type; + } + + public ReplicationFactor getFactor() { + return factor; + } + + public String getName() { + return name; + } + + public void addMember(DatanodeDetails datanodeDetails) { + datanodes.put(datanodeDetails.getUuid().toString(), + datanodeDetails); + } + + @JsonIgnore + public HddsProtos.PipelineChannel getProtobufMessage() { + HddsProtos.PipelineChannel.Builder builder = + HddsProtos.PipelineChannel.newBuilder(); + for (DatanodeDetails datanode : datanodes.values()) { + builder.addMembers(datanode.getProtoBufMessage()); + } + builder.setLeaderID(leaderID); + + if (this.getLifeCycleState() != null) { + builder.setState(this.getLifeCycleState()); + } + if (this.getType() != null) { + builder.setType(this.getType()); + } + + if (this.getFactor() != null) { + builder.setFactor(this.getFactor()); + } + return builder.build(); + } + + public static PipelineChannel getFromProtoBuf( + HddsProtos.PipelineChannel transportProtos) { + Preconditions.checkNotNull(transportProtos); + PipelineChannel pipelineChannel = + new PipelineChannel(transportProtos.getLeaderID(), + transportProtos.getState(), + transportProtos.getType(), + transportProtos.getFactor(), + transportProtos.getName()); + + for (HddsProtos.DatanodeDetailsProto dataID : + transportProtos.getMembersList()) { + pipelineChannel.addMember(DatanodeDetails.getFromProtoBuf(dataID)); + } + return pipelineChannel; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/StorageContainerException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/StorageContainerException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/StorageContainerException.java new file mode 100644 index 0000000..35d8444 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/StorageContainerException.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container.common.helpers; + +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos; + +import java.io.IOException; + +/** + * Exceptions thrown from the Storage Container. + */ +public class StorageContainerException extends IOException { + private ContainerProtos.Result result; + + /** + * Constructs an {@code IOException} with {@code null} + * as its error detail message. + */ + public StorageContainerException(ContainerProtos.Result result) { + this.result = result; + } + + /** + * Constructs an {@code IOException} with the specified detail message. + * + * @param message The detail message (which is saved for later retrieval by + * the {@link #getMessage()} method) + * @param result - The result code + */ + public StorageContainerException(String message, + ContainerProtos.Result result) { + super(message); + this.result = result; + } + + /** + * Constructs an {@code IOException} with the specified detail message + * and cause. + * <p> + * <p> Note that the detail message associated with {@code cause} is + * <i>not</i> automatically incorporated into this exception's detail + * message. + * + * @param message The detail message (which is saved for later retrieval by + * the {@link #getMessage()} method) + * + * @param cause The cause (which is saved for later retrieval by the {@link + * #getCause()} method). (A null value is permitted, and indicates that the + * cause is nonexistent or unknown.) + * + * @param result - The result code + * @since 1.6 + */ + public StorageContainerException(String message, Throwable cause, + ContainerProtos.Result result) { + super(message, cause); + this.result = result; + } + + /** + * Constructs an {@code IOException} with the specified cause and a + * detail message of {@code (cause==null ? null : cause.toString())} + * (which typically contains the class and detail message of {@code cause}). + * This constructor is useful for IO exceptions that are little more + * than wrappers for other throwables. + * + * @param cause The cause (which is saved for later retrieval by the {@link + * #getCause()} method). (A null value is permitted, and indicates that the + * cause is nonexistent or unknown.) + * @param result - The result code + * @since 1.6 + */ + public StorageContainerException(Throwable cause, ContainerProtos.Result + result) { + super(cause); + this.result = result; + } + + /** + * Returns Result. + * + * @return Result. + */ + public ContainerProtos.Result getResult() { + return result; + } + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/package-info.java new file mode 100644 index 0000000..ffe0d3d --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container.common.helpers; +/** + Contains protocol buffer helper classes and utilites used in + impl. + **/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/package-info.java new file mode 100644 index 0000000..3c544db --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/package-info.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm; + +/** + * This package contains classes for the client of the storage container + * protocol. + */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/LocatedContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/LocatedContainer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/LocatedContainer.java new file mode 100644 index 0000000..14ee3d2 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/LocatedContainer.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; + +import java.util.Set; + +/** + * Holds the nodes that currently host the container for an object key hash. + */ +@InterfaceAudience.Private +public final class LocatedContainer { + private final String key; + private final String matchedKeyPrefix; + private final String containerName; + private final Set<DatanodeInfo> locations; + private final DatanodeInfo leader; + + /** + * Creates a LocatedContainer. + * + * @param key object key + * @param matchedKeyPrefix prefix of key that was used to find the location + * @param containerName container name + * @param locations nodes that currently host the container + * @param leader node that currently acts as pipeline leader + */ + public LocatedContainer(String key, String matchedKeyPrefix, + String containerName, Set<DatanodeInfo> locations, DatanodeInfo leader) { + this.key = key; + this.matchedKeyPrefix = matchedKeyPrefix; + this.containerName = containerName; + this.locations = locations; + this.leader = leader; + } + + /** + * Returns the container name. + * + * @return container name + */ + public String getContainerName() { + return this.containerName; + } + + /** + * Returns the object key. + * + * @return object key + */ + public String getKey() { + return this.key; + } + + /** + * Returns the node that currently acts as pipeline leader. + * + * @return node that currently acts as pipeline leader + */ + public DatanodeInfo getLeader() { + return this.leader; + } + + /** + * Returns the nodes that currently host the container. + * + * @return Set<DatanodeInfo> nodes that currently host the container + */ + public Set<DatanodeInfo> getLocations() { + return this.locations; + } + + /** + * Returns the prefix of the key that was used to find the location. + * + * @return prefix of the key that was used to find the location + */ + public String getMatchedKeyPrefix() { + return this.matchedKeyPrefix; + } + + @Override + public boolean equals(Object otherObj) { + if (otherObj == null) { + return false; + } + if (!(otherObj instanceof LocatedContainer)) { + return false; + } + LocatedContainer other = (LocatedContainer)otherObj; + return this.key == null ? other.key == null : this.key.equals(other.key); + } + + @Override + public int hashCode() { + return key.hashCode(); + } + + @Override + public String toString() { + return getClass().getSimpleName() + + "{key=" + key + + "; matchedKeyPrefix=" + matchedKeyPrefix + + "; containerName=" + containerName + + "; locations=" + locations + + "; leader=" + leader + + "}"; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java new file mode 100644 index 0000000..f100fc7 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.protocol; + +import org.apache.hadoop.hdds.scm.ScmInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +/** + * ScmBlockLocationProtocol is used by an HDFS node to find the set of nodes + * to read/write a block. + */ +public interface ScmBlockLocationProtocol { + + /** + * Find the set of nodes to read/write a block, as + * identified by the block key. This method supports batch lookup by + * passing multiple keys. + * + * @param keys batch of block keys to find + * @return allocated blocks for each block key + * @throws IOException if there is any failure + */ + Set<AllocatedBlock> getBlockLocations(Set<String> keys) throws IOException; + + /** + * Asks SCM where a block should be allocated. SCM responds with the + * set of datanodes that should be used creating this block. + * @param size - size of the block. + * @return allocated block accessing info (key, pipeline). + * @throws IOException + */ + AllocatedBlock allocateBlock(long size, ReplicationType type, + ReplicationFactor factor, String owner) throws IOException; + + /** + * Delete blocks for a set of object keys. + * + * @param keyBlocksInfoList Map of object key and its blocks. + * @return list of block deletion results. + * @throws IOException if there is any failure. + */ + List<DeleteBlockGroupResult> + deleteKeyBlocks(List<BlockGroup> keyBlocksInfoList) throws IOException; + + /** + * Gets the Clusterid and SCM Id from SCM. + */ + ScmInfo getScmInfo() throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmLocatedBlock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmLocatedBlock.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmLocatedBlock.java new file mode 100644 index 0000000..6cbdee4 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmLocatedBlock.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Holds the nodes that currently host the block for a block key. + */ +@InterfaceAudience.Private +public final class ScmLocatedBlock { + private final String key; + private final List<DatanodeInfo> locations; + private final DatanodeInfo leader; + + /** + * Creates a ScmLocatedBlock. + * + * @param key object key + * @param locations nodes that currently host the block + * @param leader node that currently acts as pipeline leader + */ + public ScmLocatedBlock(final String key, final List<DatanodeInfo> locations, + final DatanodeInfo leader) { + this.key = key; + this.locations = locations; + this.leader = leader; + } + + /** + * Returns the object key. + * + * @return object key + */ + public String getKey() { + return this.key; + } + + /** + * Returns the node that currently acts as pipeline leader. + * + * @return node that currently acts as pipeline leader + */ + public DatanodeInfo getLeader() { + return this.leader; + } + + /** + * Returns the nodes that currently host the block. + * + * @return List<DatanodeInfo> nodes that currently host the block + */ + public List<DatanodeInfo> getLocations() { + return this.locations; + } + + @Override + public boolean equals(Object otherObj) { + if (otherObj == null) { + return false; + } + if (!(otherObj instanceof ScmLocatedBlock)) { + return false; + } + ScmLocatedBlock other = (ScmLocatedBlock)otherObj; + return this.key == null ? other.key == null : this.key.equals(other.key); + } + + @Override + public int hashCode() { + return key.hashCode(); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "{key=" + key + "; locations=" + + locations.stream().map(loc -> loc.toString()).collect(Collectors + .joining(",")) + "; leader=" + leader + "}"; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java new file mode 100644 index 0000000..a60fbb2 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.protocol; + +import org.apache.hadoop.hdds.scm.ScmInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; + +/** + * ContainerLocationProtocol is used by an HDFS node to find the set of nodes + * that currently host a container. + */ +public interface StorageContainerLocationProtocol { + /** + * Asks SCM where a container should be allocated. SCM responds with the + * set of datanodes that should be used creating this container. + * + */ + Pipeline allocateContainer(HddsProtos.ReplicationType replicationType, + HddsProtos.ReplicationFactor factor, String containerName, String owner) + throws IOException; + + /** + * Ask SCM the location of the container. SCM responds with a group of + * nodes where this container and its replicas are located. + * + * @param containerName - Name of the container. + * @return Pipeline - the pipeline where container locates. + * @throws IOException + */ + Pipeline getContainer(String containerName) throws IOException; + + /** + * Ask SCM a list of containers with a range of container names + * and the limit of count. + * Search container names between start name(exclusive), and + * use prefix name to filter the result. the max size of the + * searching range cannot exceed the value of count. + * + * @param startName start name, if null, start searching at the head. + * @param prefixName prefix name, if null, then filter is disabled. + * @param count count, if count < 0, the max size is unlimited.( + * Usually the count will be replace with a very big + * value instead of being unlimited in case the db is very big) + * + * @return a list of container. + * @throws IOException + */ + List<ContainerInfo> listContainer(String startName, String prefixName, + int count) throws IOException; + + /** + * Deletes a container in SCM. + * + * @param containerName + * @throws IOException + * if failed to delete the container mapping from db store + * or container doesn't exist. + */ + void deleteContainer(String containerName) throws IOException; + + /** + * Queries a list of Node Statuses. + * @param nodeStatuses + * @return List of Datanodes. + */ + HddsProtos.NodePool queryNode(EnumSet<HddsProtos.NodeState> nodeStatuses, + HddsProtos.QueryScope queryScope, String poolName) throws IOException; + + /** + * Notify from client when begin or finish creating objects like pipeline + * or containers on datanodes. + * Container will be in Operational state after that. + * @param type object type + * @param name object name + * @param op operation type (e.g., create, close, delete) + * @param stage creation stage + */ + void notifyObjectStageChange( + ObjectStageChangeRequestProto.Type type, String name, + ObjectStageChangeRequestProto.Op op, + ObjectStageChangeRequestProto.Stage stage) throws IOException; + + /** + * Creates a replication pipeline of a specified type. + * @param type - replication type + * @param factor - factor 1 or 3 + * @param nodePool - optional machine list to build a pipeline. + * @throws IOException + */ + Pipeline createReplicationPipeline(HddsProtos.ReplicationType type, + HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool) + throws IOException; + + /** + * Returns information about SCM. + * + * @return {@link ScmInfo} + * @throws IOException + */ + ScmInfo getScmInfo() throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/package-info.java new file mode 100644 index 0000000..b56a749 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.protocol; http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java new file mode 100644 index 0000000..0012f3e --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.protocolPB; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdds.scm.ScmInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos + .AllocateScmBlockRequestProto; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos + .AllocateScmBlockResponseProto; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos + .DeleteScmKeyBlocksRequestProto; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos + .DeleteScmKeyBlocksResponseProto; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos + .GetScmBlockLocationsRequestProto; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos + .GetScmBlockLocationsResponseProto; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos + .KeyBlocks; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos + .ScmLocatedBlockProto; +import org.apache.hadoop.ipc.ProtobufHelper; +import org.apache.hadoop.ipc.ProtocolTranslator; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class is the client-side translator to translate the requests made on + * the {@link ScmBlockLocationProtocol} interface to the RPC server + * implementing {@link ScmBlockLocationProtocolPB}. + */ +@InterfaceAudience.Private +public final class ScmBlockLocationProtocolClientSideTranslatorPB + implements ScmBlockLocationProtocol, ProtocolTranslator, Closeable { + + /** + * RpcController is not used and hence is set to null. + */ + private static final RpcController NULL_RPC_CONTROLLER = null; + + private final ScmBlockLocationProtocolPB rpcProxy; + + /** + * Creates a new StorageContainerLocationProtocolClientSideTranslatorPB. + * + * @param rpcProxy {@link StorageContainerLocationProtocolPB} RPC proxy + */ + public ScmBlockLocationProtocolClientSideTranslatorPB( + ScmBlockLocationProtocolPB rpcProxy) { + this.rpcProxy = rpcProxy; + } + + /** + * Find the set of nodes to read/write a block, as + * identified by the block key. This method supports batch lookup by + * passing multiple keys. + * + * @param keys batch of block keys to find + * @return allocated blocks for each block key + * @throws IOException if there is any failure + */ + @Override + public Set<AllocatedBlock> getBlockLocations(Set<String> keys) + throws IOException { + GetScmBlockLocationsRequestProto.Builder req = + GetScmBlockLocationsRequestProto.newBuilder(); + for (String key : keys) { + req.addKeys(key); + } + final GetScmBlockLocationsResponseProto resp; + try { + resp = rpcProxy.getScmBlockLocations(NULL_RPC_CONTROLLER, + req.build()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + Set<AllocatedBlock> locatedBlocks = + Sets.newLinkedHashSetWithExpectedSize(resp.getLocatedBlocksCount()); + for (ScmLocatedBlockProto locatedBlock : resp.getLocatedBlocksList()) { + locatedBlocks.add(new AllocatedBlock.Builder() + .setKey(locatedBlock.getKey()) + .setPipeline(Pipeline.getFromProtoBuf(locatedBlock.getPipeline())) + .build()); + } + return locatedBlocks; + } + + /** + * Asks SCM where a block should be allocated. SCM responds with the + * set of datanodes that should be used creating this block. + * @param size - size of the block. + * @return allocated block accessing info (key, pipeline). + * @throws IOException + */ + @Override + public AllocatedBlock allocateBlock(long size, + HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor, + String owner) throws IOException { + Preconditions.checkArgument(size > 0, "block size must be greater than 0"); + + AllocateScmBlockRequestProto request = + AllocateScmBlockRequestProto.newBuilder().setSize(size).setType(type) + .setFactor(factor).setOwner(owner).build(); + final AllocateScmBlockResponseProto response; + try { + response = rpcProxy.allocateScmBlock(NULL_RPC_CONTROLLER, request); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + if (response.getErrorCode() != + AllocateScmBlockResponseProto.Error.success) { + throw new IOException(response.hasErrorMessage() ? + response.getErrorMessage() : "Allocate block failed."); + } + AllocatedBlock.Builder builder = new AllocatedBlock.Builder() + .setKey(response.getKey()) + .setPipeline(Pipeline.getFromProtoBuf(response.getPipeline())) + .setShouldCreateContainer(response.getCreateContainer()); + return builder.build(); + } + + /** + * Delete the set of keys specified. + * + * @param keyBlocksInfoList batch of block keys to delete. + * @return list of block deletion results. + * @throws IOException if there is any failure. + * + */ + @Override + public List<DeleteBlockGroupResult> deleteKeyBlocks( + List<BlockGroup> keyBlocksInfoList) throws IOException { + List<KeyBlocks> keyBlocksProto = keyBlocksInfoList.stream() + .map(BlockGroup::getProto).collect(Collectors.toList()); + DeleteScmKeyBlocksRequestProto request = DeleteScmKeyBlocksRequestProto + .newBuilder().addAllKeyBlocks(keyBlocksProto).build(); + + final DeleteScmKeyBlocksResponseProto resp; + try { + resp = rpcProxy.deleteScmKeyBlocks(NULL_RPC_CONTROLLER, request); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + List<DeleteBlockGroupResult> results = + new ArrayList<>(resp.getResultsCount()); + results.addAll(resp.getResultsList().stream().map( + result -> new DeleteBlockGroupResult(result.getObjectKey(), + DeleteBlockGroupResult + .convertBlockResultProto(result.getBlockResultsList()))) + .collect(Collectors.toList())); + return results; + } + + /** + * Gets the cluster Id and Scm Id from SCM. + * @return ScmInfo + * @throws IOException + */ + @Override + public ScmInfo getScmInfo() throws IOException { + HddsProtos.GetScmInfoRequestProto request = + HddsProtos.GetScmInfoRequestProto.getDefaultInstance(); + HddsProtos.GetScmInfoRespsonseProto resp; + try { + resp = rpcProxy.getScmInfo(NULL_RPC_CONTROLLER, request); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + ScmInfo.Builder builder = new ScmInfo.Builder() + .setClusterId(resp.getClusterId()) + .setScmId(resp.getScmId()); + return builder.build(); + } + + @Override + public Object getUnderlyingProxyObject() { + return rpcProxy; + } + + @Override + public void close() { + RPC.stopProxy(rpcProxy); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolPB.java new file mode 100644 index 0000000..837c95b --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolPB.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.protocolPB; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos + .ScmBlockLocationProtocolService; +import org.apache.hadoop.ipc.ProtocolInfo; + +/** + * Protocol used from an HDFS node to StorageContainerManager. This extends the + * Protocol Buffers service interface to add Hadoop-specific annotations. + */ +@ProtocolInfo(protocolName = + "org.apache.hadoop.ozone.protocol.ScmBlockLocationProtocol", + protocolVersion = 1) +@InterfaceAudience.Private +public interface ScmBlockLocationProtocolPB + extends ScmBlockLocationProtocolService.BlockingInterface { +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java new file mode 100644 index 0000000..3638f63 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.protocolPB; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdds.scm.ScmInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.ContainerRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.ContainerResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.GetContainerRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.GetContainerResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.NodeQueryRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.NodeQueryResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.PipelineRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.PipelineResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.SCMListContainerRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.SCMListContainerResponseProto; +import org.apache.hadoop.ipc.ProtobufHelper; +import org.apache.hadoop.ipc.ProtocolTranslator; +import org.apache.hadoop.ipc.RPC; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; + +/** + * This class is the client-side translator to translate the requests made on + * the {@link StorageContainerLocationProtocol} interface to the RPC server + * implementing {@link StorageContainerLocationProtocolPB}. + */ +@InterfaceAudience.Private +public final class StorageContainerLocationProtocolClientSideTranslatorPB + implements StorageContainerLocationProtocol, ProtocolTranslator, Closeable { + + /** + * RpcController is not used and hence is set to null. + */ + private static final RpcController NULL_RPC_CONTROLLER = null; + + private final StorageContainerLocationProtocolPB rpcProxy; + + /** + * Creates a new StorageContainerLocationProtocolClientSideTranslatorPB. + * + * @param rpcProxy {@link StorageContainerLocationProtocolPB} RPC proxy + */ + public StorageContainerLocationProtocolClientSideTranslatorPB( + StorageContainerLocationProtocolPB rpcProxy) { + this.rpcProxy = rpcProxy; + } + + /** + * Asks SCM where a container should be allocated. SCM responds with the set + * of datanodes that should be used creating this container. Ozone/SCM only + * supports replication factor of either 1 or 3. + * @param type - Replication Type + * @param factor - Replication Count + * @param containerName - Name + * @return + * @throws IOException + */ + @Override + public Pipeline allocateContainer(HddsProtos.ReplicationType type, + HddsProtos.ReplicationFactor factor, String + containerName, String owner) throws IOException { + + Preconditions.checkNotNull(containerName, "Container Name cannot be Null"); + Preconditions.checkState(!containerName.isEmpty(), "Container name cannot" + + " be empty"); + ContainerRequestProto request = ContainerRequestProto.newBuilder() + .setContainerName(containerName) + .setReplicationFactor(factor) + .setReplicationType(type) + .setOwner(owner) + .build(); + + final ContainerResponseProto response; + try { + response = rpcProxy.allocateContainer(NULL_RPC_CONTROLLER, request); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + if (response.getErrorCode() != ContainerResponseProto.Error.success) { + throw new IOException(response.hasErrorMessage() ? + response.getErrorMessage() : "Allocate container failed."); + } + return Pipeline.getFromProtoBuf(response.getPipeline()); + } + + public Pipeline getContainer(String containerName) throws IOException { + Preconditions.checkNotNull(containerName, + "Container Name cannot be Null"); + Preconditions.checkState(!containerName.isEmpty(), + "Container name cannot be empty"); + GetContainerRequestProto request = GetContainerRequestProto + .newBuilder() + .setContainerName(containerName) + .build(); + try { + GetContainerResponseProto response = + rpcProxy.getContainer(NULL_RPC_CONTROLLER, request); + return Pipeline.getFromProtoBuf(response.getPipeline()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + /** + * {@inheritDoc} + */ + @Override + public List<ContainerInfo> listContainer(String startName, String prefixName, + int count) throws IOException { + SCMListContainerRequestProto.Builder builder = SCMListContainerRequestProto + .newBuilder(); + if (prefixName != null) { + builder.setPrefixName(prefixName); + } + if (startName != null) { + builder.setStartName(startName); + } + builder.setCount(count); + SCMListContainerRequestProto request = builder.build(); + + try { + SCMListContainerResponseProto response = + rpcProxy.listContainer(NULL_RPC_CONTROLLER, request); + List<ContainerInfo> containerList = new ArrayList<>(); + for (HddsProtos.SCMContainerInfo containerInfoProto : response + .getContainersList()) { + containerList.add(ContainerInfo.fromProtobuf(containerInfoProto)); + } + return containerList; + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + /** + * Ask SCM to delete a container by name. SCM will remove + * the container mapping in its database. + * + * @param containerName + * @throws IOException + */ + @Override + public void deleteContainer(String containerName) + throws IOException { + Preconditions.checkState(!Strings.isNullOrEmpty(containerName), + "Container name cannot be null or empty"); + SCMDeleteContainerRequestProto request = SCMDeleteContainerRequestProto + .newBuilder() + .setContainerName(containerName) + .build(); + try { + rpcProxy.deleteContainer(NULL_RPC_CONTROLLER, request); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + /** + * Queries a list of Node Statuses. + * + * @param nodeStatuses + * @return List of Datanodes. + */ + @Override + public HddsProtos.NodePool queryNode(EnumSet<HddsProtos.NodeState> + nodeStatuses, HddsProtos.QueryScope queryScope, String poolName) + throws IOException { + // TODO : We support only cluster wide query right now. So ignoring checking + // queryScope and poolName + Preconditions.checkNotNull(nodeStatuses); + Preconditions.checkState(nodeStatuses.size() > 0); + NodeQueryRequestProto request = NodeQueryRequestProto.newBuilder() + .addAllQuery(nodeStatuses) + .setScope(queryScope).setPoolName(poolName).build(); + try { + NodeQueryResponseProto response = + rpcProxy.queryNode(NULL_RPC_CONTROLLER, request); + return response.getDatanodes(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + + } + + /** + * Notify from client that creates object on datanodes. + * @param type object type + * @param name object name + * @param op operation type (e.g., create, close, delete) + * @param stage object creation stage : begin/complete + */ + @Override + public void notifyObjectStageChange( + ObjectStageChangeRequestProto.Type type, String name, + ObjectStageChangeRequestProto.Op op, + ObjectStageChangeRequestProto.Stage stage) throws IOException { + Preconditions.checkState(!Strings.isNullOrEmpty(name), + "Object name cannot be null or empty"); + ObjectStageChangeRequestProto request = + ObjectStageChangeRequestProto.newBuilder() + .setType(type) + .setName(name) + .setOp(op) + .setStage(stage) + .build(); + try { + rpcProxy.notifyObjectStageChange(NULL_RPC_CONTROLLER, request); + } catch(ServiceException e){ + throw ProtobufHelper.getRemoteException(e); + } + } + + /** + * Creates a replication pipeline of a specified type. + * + * @param replicationType - replication type + * @param factor - factor 1 or 3 + * @param nodePool - optional machine list to build a pipeline. + * @throws IOException + */ + @Override + public Pipeline createReplicationPipeline(HddsProtos.ReplicationType + replicationType, HddsProtos.ReplicationFactor factor, HddsProtos + .NodePool nodePool) throws IOException { + PipelineRequestProto request = PipelineRequestProto.newBuilder() + .setNodePool(nodePool) + .setReplicationFactor(factor) + .setReplicationType(replicationType) + .build(); + try { + PipelineResponseProto response = + rpcProxy.allocatePipeline(NULL_RPC_CONTROLLER, request); + if (response.getErrorCode() == + PipelineResponseProto.Error.success) { + Preconditions.checkState(response.hasPipeline(), "With success, " + + "must come a pipeline"); + return Pipeline.getFromProtoBuf(response.getPipeline()); + } else { + String errorMessage = String.format("create replication pipeline " + + "failed. code : %s Message: %s", response.getErrorCode(), + response.hasErrorMessage() ? response.getErrorMessage() : ""); + throw new IOException(errorMessage); + } + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public ScmInfo getScmInfo() throws IOException { + HddsProtos.GetScmInfoRequestProto request = + HddsProtos.GetScmInfoRequestProto.getDefaultInstance(); + try { + HddsProtos.GetScmInfoRespsonseProto resp = rpcProxy.getScmInfo( + NULL_RPC_CONTROLLER, request); + ScmInfo.Builder builder = new ScmInfo.Builder() + .setClusterId(resp.getClusterId()) + .setScmId(resp.getScmId()); + return builder.build(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + + } + + @Override + public Object getUnderlyingProxyObject() { + return rpcProxy; + } + + @Override + public void close() { + RPC.stopProxy(rpcProxy); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org