http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java new file mode 100644 index 0000000..33a5971 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.ozoneimpl; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReportState; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl; +import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl; +import org.apache.hadoop.ozone.container.common.impl.Dispatcher; +import org.apache.hadoop.ozone.container.common.impl.KeyManagerImpl; +import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; +import org.apache.hadoop.ozone.container.common.interfaces.KeyManager; +import org.apache.hadoop.ozone.container.common.statemachine.background + .BlockDeletingService; +import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer; +import org.apache.hadoop.ozone.container.common.transport.server + .XceiverServerSpi; +import org.apache.hadoop.ozone.container.common.transport.server.ratis + .XceiverServerRatis; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Paths; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_SERVICE_INTERVAL; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_ROOT_PREFIX; +import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT; + +/** + * Ozone main class sets up the network server and initializes the container + * layer. + */ +public class OzoneContainer { + private static final Logger LOG = + LoggerFactory.getLogger(OzoneContainer.class); + + private final Configuration ozoneConfig; + private final ContainerDispatcher dispatcher; + private final ContainerManager manager; + private final XceiverServerSpi[] server; + private final ChunkManager chunkManager; + private final KeyManager keyManager; + private final BlockDeletingService blockDeletingService; + + /** + * Creates a network endpoint and enables Ozone container. + * + * @param ozoneConfig - Config + * @throws IOException + */ + public OzoneContainer( + DatanodeDetails datanodeDetails, Configuration ozoneConfig) + throws IOException { + this.ozoneConfig = ozoneConfig; + List<StorageLocation> locations = new LinkedList<>(); + String[] paths = ozoneConfig.getStrings( + OzoneConfigKeys.OZONE_METADATA_DIRS); + if (paths != null && paths.length > 0) { + for (String p : paths) { + locations.add(StorageLocation.parse( + Paths.get(p).resolve(CONTAINER_ROOT_PREFIX).toString())); + } + } else { + getDataDir(locations); + } + + manager = new ContainerManagerImpl(); + manager.init(this.ozoneConfig, locations, datanodeDetails); + this.chunkManager = new ChunkManagerImpl(manager); + manager.setChunkManager(this.chunkManager); + + this.keyManager = new KeyManagerImpl(manager, ozoneConfig); + manager.setKeyManager(this.keyManager); + + long svcInterval = + ozoneConfig.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, + OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); + long serviceTimeout = ozoneConfig.getTimeDuration( + OZONE_BLOCK_DELETING_SERVICE_TIMEOUT, + OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); + this.blockDeletingService = new BlockDeletingService(manager, + svcInterval, serviceTimeout, ozoneConfig); + + this.dispatcher = new Dispatcher(manager, this.ozoneConfig); + + server = new XceiverServerSpi[]{ + new XceiverServer(datanodeDetails, this.ozoneConfig, this.dispatcher), + XceiverServerRatis + .newXceiverServerRatis(datanodeDetails, this.ozoneConfig, dispatcher) + }; + } + + /** + * Starts serving requests to ozone container. + * + * @throws IOException + */ + public void start() throws IOException { + for (XceiverServerSpi serverinstance : server) { + serverinstance.start(); + } + blockDeletingService.start(); + dispatcher.init(); + } + + /** + * Stops the ozone container. + * <p> + * Shutdown logic is not very obvious from the following code. if you need to + * modify the logic, please keep these comments in mind. Here is the shutdown + * sequence. + * <p> + * 1. We shutdown the network ports. + * <p> + * 2. Now we need to wait for all requests in-flight to finish. + * <p> + * 3. The container manager lock is a read-write lock with "Fairness" + * enabled. + * <p> + * 4. This means that the waiting threads are served in a "first-come-first + * -served" manner. Please note that this applies to waiting threads only. + * <p> + * 5. Since write locks are exclusive, if we are waiting to get a lock it + * implies that we are waiting for in-flight operations to complete. + * <p> + * 6. if there are other write operations waiting on the reader-writer lock, + * fairness guarantees that they will proceed before the shutdown lock + * request. + * <p> + * 7. Since all operations either take a reader or writer lock of container + * manager, we are guaranteed that we are the last operation since we have + * closed the network port, and we wait until close is successful. + * <p> + * 8. We take the writer lock and call shutdown on each of the managers in + * reverse order. That is chunkManager, keyManager and containerManager is + * shutdown. + */ + public void stop() { + LOG.info("Attempting to stop container services."); + for(XceiverServerSpi serverinstance: server) { + serverinstance.stop(); + } + dispatcher.shutdown(); + + try { + this.manager.writeLock(); + this.chunkManager.shutdown(); + this.keyManager.shutdown(); + this.manager.shutdown(); + this.blockDeletingService.shutdown(); + LOG.info("container services shutdown complete."); + } catch (IOException ex) { + LOG.warn("container service shutdown error:", ex); + } finally { + this.manager.writeUnlock(); + } + } + + /** + * Returns a paths to data dirs. + * + * @param pathList - List of paths. + * @throws IOException + */ + private void getDataDir(List<StorageLocation> pathList) throws IOException { + for (String dir : ozoneConfig.getStrings(DFS_DATANODE_DATA_DIR_KEY)) { + StorageLocation location = StorageLocation.parse(dir); + pathList.add(location); + } + } + + /** + * Returns node report of container storage usage. + */ + public SCMNodeReport getNodeReport() throws IOException { + return this.manager.getNodeReport(); + } + + private int getPortbyType(HddsProtos.ReplicationType replicationType) { + for (XceiverServerSpi serverinstance : server) { + if (serverinstance.getServerType() == replicationType) { + return serverinstance.getIPCPort(); + } + } + return INVALID_PORT; + } + + /** + * Returns the container server IPC port. + * + * @return Container server IPC port. + */ + public int getContainerServerPort() { + return getPortbyType(HddsProtos.ReplicationType.STAND_ALONE); + } + + /** + * Returns the Ratis container Server IPC port. + * + * @return Ratis port. + */ + public int getRatisContainerServerPort() { + return getPortbyType(HddsProtos.ReplicationType.RATIS); + } + + /** + * Returns container report. + * @return - container report. + * @throws IOException + */ + public ContainerReportsRequestProto getContainerReport() throws IOException { + return this.manager.getContainerReport(); + } + +// TODO: remove getContainerReports + /** + * Returns the list of closed containers. + * @return - List of closed containers. + * @throws IOException + */ + public List<ContainerData> getContainerReports() throws IOException { + return this.manager.getContainerReports(); + } + + @VisibleForTesting + public ContainerManager getContainerManager() { + return this.manager; + } + + /** + * Get the container report state to send via HB to SCM. + * @return the container report state. + */ + public ReportState getContainerReportState() { + return this.manager.getContainerReportState(); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/package-info.java new file mode 100644 index 0000000..c99c038 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.ozoneimpl; +/** + Ozone main that calls into the container layer +**/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/package-info.java new file mode 100644 index 0000000..1a51012 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone; + +/** + * Generic ozone specific classes. + */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java new file mode 100644 index 0000000..43e7412 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos + .ContainerBlocksDeletionACKResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReportState; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; + +import java.io.IOException; + +/** + * The protocol spoken between datanodes and SCM. For specifics please the + * Protoc file that defines this protocol. + */ [email protected] +public interface StorageContainerDatanodeProtocol { + /** + * Returns SCM version. + * @return Version info. + */ + SCMVersionResponseProto getVersion(SCMVersionRequestProto versionRequest) + throws IOException; + + /** + * Used by data node to send a Heartbeat. + * @param datanodeDetails - Datanode Details. + * @param nodeReport - node report state + * @param reportState - container report state. + * @return - SCMHeartbeatResponseProto + * @throws IOException + */ + SCMHeartbeatResponseProto sendHeartbeat(DatanodeDetailsProto datanodeDetails, + SCMNodeReport nodeReport, ReportState reportState) throws IOException; + + /** + * Register Datanode. + * @param datanodeDetails - Datanode Details. + * @param scmAddresses - List of SCMs this datanode is configured to + * communicate. + * @return SCM Command. + */ + SCMRegisteredCmdResponseProto register(DatanodeDetailsProto datanodeDetails, + String[] scmAddresses) throws IOException; + + /** + * Send a container report. + * @param reports -- Container report. + * @return container reports response. + * @throws IOException + */ + ContainerReportsResponseProto sendContainerReport( + ContainerReportsRequestProto reports) throws IOException; + + /** + * Used by datanode to send block deletion ACK to SCM. + * @param request block deletion transactions. + * @return block deletion transaction response. + * @throws IOException + */ + ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( + ContainerBlocksDeletionACKProto request) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java new file mode 100644 index 0000000..1fc7c57 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReportState; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; + +import java.util.List; + +/** + * The protocol spoken between datanodes and SCM. + * + * Please note that the full protocol spoken between a datanode and SCM is + * separated into 2 interfaces. One interface that deals with node state and + * another interface that deals with containers. + * + * This interface has functions that deals with the state of datanode. + */ [email protected] +public interface StorageContainerNodeProtocol { + /** + * Gets the version info from SCM. + * @param versionRequest - version Request. + * @return - returns SCM version info and other required information needed + * by datanode. + */ + VersionResponse getVersion(SCMVersionRequestProto versionRequest); + + /** + * Register the node if the node finds that it is not registered with any SCM. + * @param datanodeDetails DatanodeDetails + * @return SCMHeartbeatResponseProto + */ + SCMCommand register(DatanodeDetailsProto datanodeDetails); + + /** + * Send heartbeat to indicate the datanode is alive and doing well. + * @param datanodeDetails - Datanode ID. + * @param nodeReport - node report. + * @param reportState - container report. + * @return SCMheartbeat response list + */ + List<SCMCommand> sendHeartbeat(DatanodeDetailsProto datanodeDetails, + SCMNodeReport nodeReport, ReportState reportState); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/VersionResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/VersionResponse.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/VersionResponse.java new file mode 100644 index 0000000..83acf5b --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/VersionResponse.java @@ -0,0 +1,150 @@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.protocol; + +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.KeyValue; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Version response class. + */ +public class VersionResponse { + private final int version; + private final Map<String, String> values; + + /** + * Creates a version response class. + * @param version + * @param values + */ + public VersionResponse(int version, Map<String, String> values) { + this.version = version; + this.values = values; + } + + /** + * Creates a version Response class. + * @param version + */ + public VersionResponse(int version) { + this.version = version; + this.values = new HashMap<>(); + } + + /** + * Returns a new Builder. + * @return - Builder. + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Returns this class from protobuf message. + * @param response - SCMVersionResponseProto + * @return VersionResponse + */ + public static VersionResponse getFromProtobuf(SCMVersionResponseProto + response) { + return new VersionResponse(response.getSoftwareVersion(), + response.getKeysList().stream() + .collect(Collectors.toMap(KeyValue::getKey, + KeyValue::getValue))); + } + + /** + * Adds a value to version Response. + * @param key - String + * @param value - String + */ + public void put(String key, String value) { + if (this.values.containsKey(key)) { + throw new IllegalArgumentException("Duplicate key in version response"); + } + values.put(key, value); + } + + /** + * Return a protobuf message. + * @return SCMVersionResponseProto. + */ + public SCMVersionResponseProto getProtobufMessage() { + + List<KeyValue> list = new LinkedList<>(); + for (Map.Entry<String, String> entry : values.entrySet()) { + list.add(KeyValue.newBuilder().setKey(entry.getKey()). + setValue(entry.getValue()).build()); + } + return + SCMVersionResponseProto.newBuilder() + .setSoftwareVersion(this.version) + .addAllKeys(list).build(); + } + + /** + * Builder class. + */ + public static class Builder { + private int version; + private Map<String, String> values; + + Builder() { + values = new HashMap<>(); + } + + /** + * Sets the version. + * @param ver - version + * @return Builder + */ + public Builder setVersion(int ver) { + this.version = ver; + return this; + } + + /** + * Adds a value to version Response. + * @param key - String + * @param value - String + */ + public Builder addValue(String key, String value) { + if (this.values.containsKey(key)) { + throw new IllegalArgumentException("Duplicate key in version response"); + } + values.put(key, value); + return this; + } + + /** + * Builds the version response. + * @return VersionResponse. + */ + public VersionResponse build() { + return new VersionResponse(this.version, this.values); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java new file mode 100644 index 0000000..b1cdbc4 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.protocol.commands; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCloseContainerCmdResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCmdType; + +import static org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCmdType.closeContainerCommand; + +/** + * Asks datanode to close a container. + */ +public class CloseContainerCommand + extends SCMCommand<SCMCloseContainerCmdResponseProto> { + + private String containerName; + + public CloseContainerCommand(String containerName) { + this.containerName = containerName; + } + + /** + * Returns the type of this command. + * + * @return Type + */ + @Override + public SCMCmdType getType() { + return closeContainerCommand; + } + + /** + * Gets the protobuf message of this object. + * + * @return A protobuf message. + */ + @Override + public byte[] getProtoBufMessage() { + return getProto().toByteArray(); + } + + public SCMCloseContainerCmdResponseProto getProto() { + return SCMCloseContainerCmdResponseProto.newBuilder() + .setContainerName(containerName).build(); + } + + public static CloseContainerCommand getFromProtobuf( + SCMCloseContainerCmdResponseProto closeContainerProto) { + Preconditions.checkNotNull(closeContainerProto); + return new CloseContainerCommand(closeContainerProto.getContainerName()); + + } + + public String getContainerName() { + return containerName; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java new file mode 100644 index 0000000..a11ca25 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.protocol.commands; + +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCmdType; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMDeleteBlocksCmdResponseProto; + +import java.util.List; + +/** + * A SCM command asks a datanode to delete a number of blocks. + */ +public class DeleteBlocksCommand extends + SCMCommand<SCMDeleteBlocksCmdResponseProto> { + + private List<DeletedBlocksTransaction> blocksTobeDeleted; + + + public DeleteBlocksCommand(List<DeletedBlocksTransaction> blocks) { + this.blocksTobeDeleted = blocks; + } + + public List<DeletedBlocksTransaction> blocksTobeDeleted() { + return this.blocksTobeDeleted; + } + + @Override + public SCMCmdType getType() { + return SCMCmdType.deleteBlocksCommand; + } + + @Override + public byte[] getProtoBufMessage() { + return getProto().toByteArray(); + } + + public static DeleteBlocksCommand getFromProtobuf( + SCMDeleteBlocksCmdResponseProto deleteBlocksProto) { + return new DeleteBlocksCommand(deleteBlocksProto + .getDeletedBlocksTransactionsList()); + } + + public SCMDeleteBlocksCmdResponseProto getProto() { + return SCMDeleteBlocksCmdResponseProto.newBuilder() + .addAllDeletedBlocksTransactions(blocksTobeDeleted).build(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java new file mode 100644 index 0000000..88b5911 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.protocol.commands; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCmdType; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto + .ErrorCode; + +/** + * Response to Datanode Register call. + */ +public class RegisteredCommand extends + SCMCommand<SCMRegisteredCmdResponseProto> { + private String datanodeUUID; + private String clusterID; + private ErrorCode error; + + public RegisteredCommand(final ErrorCode error, final String datanodeUUID, + final String clusterID) { + this.datanodeUUID = datanodeUUID; + this.clusterID = clusterID; + this.error = error; + } + + /** + * Returns a new builder. + * + * @return - Builder + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Returns the type of this command. + * + * @return Type + */ + @Override + public SCMCmdType getType() { + return SCMCmdType.registeredCommand; + } + + /** + * Returns datanode UUID. + * + * @return - Datanode ID. + */ + public String getDatanodeUUID() { + return datanodeUUID; + } + + /** + * Returns cluster ID. + * + * @return -- ClusterID + */ + public String getClusterID() { + return clusterID; + } + + /** + * Returns ErrorCode. + * + * @return - ErrorCode + */ + public ErrorCode getError() { + return error; + } + + /** + * Gets the protobuf message of this object. + * + * @return A protobuf message. + */ + @Override + public byte[] getProtoBufMessage() { + return SCMRegisteredCmdResponseProto.newBuilder() + .setClusterID(this.clusterID) + .setDatanodeUUID(this.datanodeUUID) + .setErrorCode(this.error) + .build().toByteArray(); + } + + /** + * A builder class to verify all values are sane. + */ + public static class Builder { + private String datanodeUUID; + private String clusterID; + private ErrorCode error; + + /** + * sets UUID. + * + * @param dnUUID - datanode UUID + * @return Builder + */ + public Builder setDatanodeUUID(String dnUUID) { + this.datanodeUUID = dnUUID; + return this; + } + + /** + * Create this object from a Protobuf message. + * + * @param response - RegisteredCmdResponseProto + * @return RegisteredCommand + */ + public RegisteredCommand getFromProtobuf(SCMRegisteredCmdResponseProto + response) { + Preconditions.checkNotNull(response); + return new RegisteredCommand(response.getErrorCode(), + response.hasDatanodeUUID() ? response.getDatanodeUUID(): "", + response.hasClusterID() ? response.getClusterID(): ""); + } + + /** + * Sets cluster ID. + * + * @param cluster - clusterID + * @return Builder + */ + public Builder setClusterID(String cluster) { + this.clusterID = cluster; + return this; + } + + /** + * Sets Error code. + * + * @param errorCode - error code + * @return Builder + */ + public Builder setErrorCode(ErrorCode errorCode) { + this.error = errorCode; + return this; + } + + /** + * Build the command object. + * + * @return RegisteredCommand + */ + public RegisteredCommand build() { + if ((this.error == ErrorCode.success) && + (this.datanodeUUID == null || this.datanodeUUID.isEmpty()) || + (this.clusterID == null || this.clusterID.isEmpty())) { + throw new IllegalArgumentException("On success, RegisteredCommand " + + "needs datanodeUUID and ClusterID."); + } + + return new + RegisteredCommand(this.error, this.datanodeUUID, this.clusterID); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java new file mode 100644 index 0000000..c167d59 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.protocol.commands; + +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCmdType; + +import static org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCmdType.reregisterCommand; +import static org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto; + +/** + * Informs a datanode to register itself with SCM again. + */ +public class ReregisterCommand extends + SCMCommand<SCMReregisterCmdResponseProto>{ + + /** + * Returns the type of this command. + * + * @return Type + */ + @Override + public SCMCmdType getType() { + return reregisterCommand; + } + + /** + * Gets the protobuf message of this object. + * + * @return A protobuf message. + */ + @Override + public byte[] getProtoBufMessage() { + return getProto().toByteArray(); + } + + public SCMReregisterCmdResponseProto getProto() { + return SCMReregisterCmdResponseProto + .newBuilder() + .build(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java new file mode 100644 index 0000000..73e4194 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.protocol.commands; + +import com.google.protobuf.GeneratedMessage; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCmdType; + +/** + * A class that acts as the base class to convert between Java and SCM + * commands in protobuf format. + * @param <T> + */ +public abstract class SCMCommand<T extends GeneratedMessage> { + /** + * Returns the type of this command. + * @return Type + */ + public abstract SCMCmdType getType(); + + /** + * Gets the protobuf message of this object. + * @return A protobuf message. + */ + public abstract byte[] getProtoBufMessage(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SendContainerCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SendContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SendContainerCommand.java new file mode 100644 index 0000000..8431752 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SendContainerCommand.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.protocol.commands; + +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCmdType; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SendContainerReportProto; + +/** + * Allows a Datanode to send in the container report. + */ +public class SendContainerCommand extends SCMCommand<SendContainerReportProto> { + /** + * Returns a NullCommand class from NullCommandResponse Proto. + * @param unused - unused + * @return NullCommand + */ + public static SendContainerCommand getFromProtobuf( + final SendContainerReportProto unused) { + return new SendContainerCommand(); + } + + /** + * returns a new builder. + * @return Builder + */ + public static SendContainerCommand.Builder newBuilder() { + return new SendContainerCommand.Builder(); + } + + /** + * Returns the type of this command. + * + * @return Type + */ + @Override + public SCMCmdType getType() { + return SCMCmdType.sendContainerReport; + } + + /** + * Gets the protobuf message of this object. + * + * @return A protobuf message. + */ + @Override + public byte[] getProtoBufMessage() { + return SendContainerReportProto.newBuilder().build().toByteArray(); + } + + /** + * A Builder class this is the standard pattern we are using for all commands. + */ + public static class Builder { + /** + * Return a null command. + * @return - NullCommand. + */ + public SendContainerCommand build() { + return new SendContainerCommand(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/package-info.java new file mode 100644 index 0000000..7083c1b --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.protocol.commands; +/** + Set of classes that help in protoc conversions. + **/ http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/package-info.java new file mode 100644 index 0000000..a718fa7 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.protocol; + +/** + * This package contains classes for HDDS protocol definitions. + */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java new file mode 100644 index 0000000..12fed1c --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.protocolPB; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos + .ContainerBlocksDeletionACKResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReportState; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; +import org.apache.hadoop.ipc.ProtobufHelper; +import org.apache.hadoop.ipc.ProtocolTranslator; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; + +import java.io.Closeable; +import java.io.IOException; + +/** + * This class is the client-side translator to translate the requests made on + * the {@link StorageContainerDatanodeProtocol} interface to the RPC server + * implementing {@link StorageContainerDatanodeProtocolPB}. + */ +public class StorageContainerDatanodeProtocolClientSideTranslatorPB + implements StorageContainerDatanodeProtocol, ProtocolTranslator, Closeable { + + /** + * RpcController is not used and hence is set to null. + */ + private static final RpcController NULL_RPC_CONTROLLER = null; + private final StorageContainerDatanodeProtocolPB rpcProxy; + + /** + * Constructs a Client side interface that calls into SCM datanode protocol. + * + * @param rpcProxy - Proxy for RPC. + */ + public StorageContainerDatanodeProtocolClientSideTranslatorPB( + StorageContainerDatanodeProtocolPB rpcProxy) { + this.rpcProxy = rpcProxy; + } + + /** + * Closes this stream and releases any system resources associated with it. If + * the stream is already closed then invoking this method has no effect. + * <p> + * <p> As noted in {@link AutoCloseable#close()}, cases where the close may + * fail require careful attention. It is strongly advised to relinquish the + * underlying resources and to internally <em>mark</em> the {@code Closeable} + * as closed, prior to throwing the {@code IOException}. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + RPC.stopProxy(rpcProxy); + } + + /** + * Return the proxy object underlying this protocol translator. + * + * @return the proxy object underlying this protocol translator. + */ + @Override + public Object getUnderlyingProxyObject() { + return rpcProxy; + } + + /** + * Returns SCM version. + * + * @param unused - set to null and unused. + * @return Version info. + */ + @Override + public SCMVersionResponseProto getVersion(SCMVersionRequestProto + unused) throws IOException { + SCMVersionRequestProto request = + SCMVersionRequestProto.newBuilder().build(); + final SCMVersionResponseProto response; + try { + response = rpcProxy.getVersion(NULL_RPC_CONTROLLER, request); + } catch (ServiceException ex) { + throw ProtobufHelper.getRemoteException(ex); + } + return response; + } + + /** + * Send by datanode to SCM. + * + * @param datanodeDetailsProto - Datanode Details + * @param nodeReport - node report + * @throws IOException + */ + + @Override + public SCMHeartbeatResponseProto sendHeartbeat( + DatanodeDetailsProto datanodeDetailsProto, + SCMNodeReport nodeReport, ReportState reportState) throws IOException { + SCMHeartbeatRequestProto.Builder req = SCMHeartbeatRequestProto + .newBuilder(); + req.setDatanodeDetails(datanodeDetailsProto); + req.setNodeReport(nodeReport); + req.setContainerReportState(reportState); + final SCMHeartbeatResponseProto resp; + try { + resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, req.build()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + return resp; + } + + /** + * Register Datanode. + * + * @param datanodeDetailsProto - Datanode Details + * @return SCM Command. + */ + @Override + public SCMRegisteredCmdResponseProto register( + DatanodeDetailsProto datanodeDetailsProto, + String[] scmAddresses) throws IOException { + SCMRegisterRequestProto.Builder req = + SCMRegisterRequestProto.newBuilder(); + req.setDatanodeDetails(datanodeDetailsProto); + final SCMRegisteredCmdResponseProto response; + try { + response = rpcProxy.register(NULL_RPC_CONTROLLER, req.build()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + return response; + } + + /** + * Send a container report. + * + * @param reports -- Container report + * @return HeartbeatRespose.nullcommand. + * @throws IOException + */ + @Override + public ContainerReportsResponseProto sendContainerReport( + ContainerReportsRequestProto reports) throws IOException { + final ContainerReportsResponseProto resp; + try { + resp = rpcProxy.sendContainerReport(NULL_RPC_CONTROLLER, reports); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + return resp; + } + + @Override + public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( + ContainerBlocksDeletionACKProto deletedBlocks) throws IOException { + final ContainerBlocksDeletionACKResponseProto resp; + try { + resp = rpcProxy.sendContainerBlocksDeletionACK(NULL_RPC_CONTROLLER, + deletedBlocks); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + return resp; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolPB.java new file mode 100644 index 0000000..9b28b5a --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolPB.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.protocolPB; + +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos + .StorageContainerDatanodeProtocolService; +import org.apache.hadoop.ipc.ProtocolInfo; + +/** + * Protocol used from a datanode to StorageContainerManager. This extends + * the Protocol Buffers service interface to add Hadoop-specific annotations. + */ + +@ProtocolInfo(protocolName = + "org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol", + protocolVersion = 1) +public interface StorageContainerDatanodeProtocolPB extends + StorageContainerDatanodeProtocolService.BlockingInterface { +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java new file mode 100644 index 0000000..985b75a --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.protocolPB; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos + .ContainerBlocksDeletionACKResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; +import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; + +import java.io.IOException; + +/** + * This class is the server-side translator that forwards requests received on + * {@link StorageContainerDatanodeProtocolPB} to the {@link + * StorageContainerDatanodeProtocol} server implementation. + */ +public class StorageContainerDatanodeProtocolServerSideTranslatorPB + implements StorageContainerDatanodeProtocolPB { + + private final StorageContainerDatanodeProtocol impl; + + public StorageContainerDatanodeProtocolServerSideTranslatorPB( + StorageContainerDatanodeProtocol impl) { + this.impl = impl; + } + + @Override + public StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto + getVersion(RpcController controller, + StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto request) + throws ServiceException { + try { + return impl.getVersion(request); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto + register(RpcController controller, StorageContainerDatanodeProtocolProtos + .SCMRegisterRequestProto request) throws ServiceException { + String[] addressArray = null; + + if (request.hasAddressList()) { + addressArray = request.getAddressList().getAddressListList() + .toArray(new String[0]); + } + + try { + return impl.register(request.getDatanodeDetails(), addressArray); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public SCMHeartbeatResponseProto + sendHeartbeat(RpcController controller, + SCMHeartbeatRequestProto request) throws ServiceException { + try { + return impl.sendHeartbeat(request.getDatanodeDetails(), + request.getNodeReport(), + request.getContainerReportState()); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public ContainerReportsResponseProto sendContainerReport( + RpcController controller, ContainerReportsRequestProto request) + throws ServiceException { + try { + return impl.sendContainerReport(request); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( + RpcController controller, ContainerBlocksDeletionACKProto request) + throws ServiceException { + try { + return impl.sendContainerBlocksDeletionACK(request); + } catch (IOException e) { + throw new ServiceException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto new file mode 100644 index 0000000..cd415e2 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -0,0 +1,351 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * These .proto interfaces are private and unstable. + * Please see http://wiki.apache.org/hadoop/Compatibility + * for what changes are allowed for a *unstable* .proto interface. + */ + +option java_package = "org.apache.hadoop.hdds.protocol.proto"; + +option java_outer_classname = "StorageContainerDatanodeProtocolProtos"; + +option java_generic_services = true; + +option java_generate_equals_and_hash = true; + +package hadoop.hdds; + +import "hdds.proto"; + + +/** +* This message is send by data node to indicate that it is alive or it is +* registering with the node manager. +*/ +message SCMHeartbeatRequestProto { + required DatanodeDetailsProto datanodeDetails = 1; + optional SCMNodeReport nodeReport = 2; + optional ReportState containerReportState = 3; +} + +enum DatanodeContainerState { + closed = 0; + open = 1; +} + +/** +NodeState contains messages from datanode to SCM saying that it has +some information that SCM might be interested in.*/ +message ReportState { + enum states { + noContainerReports = 0; + completeContinerReport = 1; + deltaContainerReport = 2; + } + required states state = 1; + required int64 count = 2 [default = 0]; +} + + +/** +This message is used to persist the information about a container in the +SCM database, This information allows SCM to startup faster and avoid having +all container info in memory all the time. + */ +message ContainerPersistanceProto { + required DatanodeContainerState state = 1; + required hadoop.hdds.Pipeline pipeline = 2; + required ContainerInfo info = 3; +} + +/** +This message is used to do a quick look up of which containers are effected +if a node goes down +*/ +message NodeContianerMapping { + repeated string contianerName = 1; +} + +/** +A container report contains the following information. +*/ +message ContainerInfo { + required string containerName = 1; + optional string finalhash = 2; + optional int64 size = 3; + optional int64 used = 4; + optional int64 keyCount = 5; + // TODO: move the io count to separate message + optional int64 readCount = 6; + optional int64 writeCount = 7; + optional int64 readBytes = 8; + optional int64 writeBytes = 9; + required int64 containerID = 10; + optional hadoop.hdds.LifeCycleState state = 11; +} + +// The deleted blocks which are stored in deletedBlock.db of scm. +message DeletedBlocksTransaction { + required int64 txID = 1; + required string containerName = 2; + repeated string blockID = 3; + // the retry time of sending deleting command to datanode. + required int32 count = 4; +} + +/** +A set of container reports, max count is generally set to +8192 since that keeps the size of the reports under 1 MB. +*/ +message ContainerReportsRequestProto { + enum reportType { + fullReport = 0; + deltaReport = 1; + } + required DatanodeDetailsProto datanodeDetails = 1; + repeated ContainerInfo reports = 2; + required reportType type = 3; +} + +message ContainerReportsResponseProto { +} + +/** +* This message is send along with the heart beat to report datanode +* storage utilization by SCM. +*/ +message SCMNodeReport { + repeated SCMStorageReport storageReport = 1; +} + +message SCMStorageReport { + required string storageUuid = 1; + optional uint64 capacity = 2 [default = 0]; + optional uint64 scmUsed = 3 [default = 0]; + optional uint64 remaining = 4 [default = 0]; + //optional hadoop.hdfs.StorageTypeProto storageType = 5 [default = DISK]; +} + +message SCMRegisterRequestProto { + required DatanodeDetailsProto datanodeDetails = 1; + optional SCMNodeAddressList addressList = 2; +} + +/** + * Request for version info of the software stack on the server. + */ +message SCMVersionRequestProto { + +} + +/** +* Generic response that is send to a version request. This allows keys to be +* added on the fly and protocol to remain stable. +*/ +message SCMVersionResponseProto { + required uint32 softwareVersion = 1; + repeated hadoop.hdds.KeyValue keys = 2; +} + +message SCMNodeAddressList { + repeated string addressList = 1; +} + +/** + * Datanode ID returned by the SCM. This is similar to name node + * registeration of a datanode. + */ +message SCMRegisteredCmdResponseProto { + enum ErrorCode { + success = 1; + errorNodeNotPermitted = 2; + } + required ErrorCode errorCode = 2; + optional string datanodeUUID = 3; + optional string clusterID = 4; + optional SCMNodeAddressList addressList = 5; +} + +/** + * SCM informs a datanode to register itself again. + * With recieving this command, datanode will transit to REGISTER state. + */ +message SCMReregisterCmdResponseProto {} + +/** +This command tells the data node to send in the container report when possible +*/ +message SendContainerReportProto { +} + +/** +This command asks the datanode to close a specific container. +*/ +message SCMCloseContainerCmdResponseProto { + required string containerName = 1; +} + +/** +Type of commands supported by SCM to datanode protocol. +*/ +enum SCMCmdType { + versionCommand = 2; + registeredCommand = 3; + sendContainerReport = 4; + reregisterCommand = 5; + deleteBlocksCommand = 6; + closeContainerCommand = 7; +} + +/* + * These are commands returned by SCM for to the datanode to execute. + */ +message SCMCommandResponseProto { + required SCMCmdType cmdType = 2; // Type of the command + optional SCMRegisteredCmdResponseProto registeredProto = 3; + optional SCMVersionResponseProto versionProto = 4; + optional SendContainerReportProto sendReport = 5; + optional SCMReregisterCmdResponseProto reregisterProto = 6; + optional SCMDeleteBlocksCmdResponseProto deleteBlocksProto = 7; + required string datanodeUUID = 8; + optional SCMCloseContainerCmdResponseProto closeContainerProto = 9; +} + + +/* + * A group of commands for the datanode to execute + */ +message SCMHeartbeatResponseProto { + repeated SCMCommandResponseProto commands = 1; +} + +// HB response from SCM, contains a list of block deletion transactions. +message SCMDeleteBlocksCmdResponseProto { + repeated DeletedBlocksTransaction deletedBlocksTransactions = 1; +} + +// SendACK response returned by datanode to SCM, currently empty. +message ContainerBlocksDeletionACKResponseProto { +} + +// ACK message datanode sent to SCM, contains the result of +// block deletion transactions. +message ContainerBlocksDeletionACKProto { + message DeleteBlockTransactionResult { + required int64 txID = 1; + required bool success = 2; + } + repeated DeleteBlockTransactionResult results = 1; +} + +/** + * Protocol used from a datanode to StorageContainerManager. + * + * Please see the request and response messages for details of the RPC calls. + * + * Here is a simple state diagram that shows how a datanode would boot up and + * communicate with SCM. + * + * ----------------------- + * | Start | + * ---------- ------------ + * | + * | + * | + * | + * | + * | + * | + * ----------v------------- + * | Searching for SCM ------------ + * ---------- ------------- | + * | | + * | | + * | ----------v------------- + * | | Register if needed | + * | ----------- ------------ + * | | + * v | + * ----------- ---------------- | + * --------- Heartbeat state <-------- + * | --------^------------------- + * | | + * | | + * | | + * | | + * | | + * | | + * | | + * ------------------ + * + * + * + * Here is how this protocol is used by the datanode. When a datanode boots up + * it moves into a stated called SEARCHING_SCM. In this state datanode is + * trying to establish communication with the SCM. The address of the SCMs are + * retrieved from the configuration information. + * + * In the SEARCHING_SCM state, only rpc call made by datanode is a getVersion + * call to SCM. Once any of the SCMs reply, datanode checks if it has a local + * persisted datanode ID. If it has this means that this datanode is already + * registered with some SCM. If this file is not found, datanode assumes that + * it needs to do a registration. + * + * If registration is need datanode moves into REGISTER state. It will + * send a register call with DatanodeDetailsProto data structure and presist + * that info. + * + * The response to the command contains clusterID. This information is + * also persisted by the datanode and moves into heartbeat state. + * + * Once in the heartbeat state, datanode sends heartbeats and container reports + * to SCM and process commands issued by SCM until it is shutdown. + * + */ +service StorageContainerDatanodeProtocolService { + + /** + * Gets the version information from the SCM. + */ + rpc getVersion (SCMVersionRequestProto) returns (SCMVersionResponseProto); + + /** + * Registers a data node with SCM. + */ + rpc register (SCMRegisterRequestProto) returns (SCMRegisteredCmdResponseProto); + + /** + * Send heartbeat from datanode to SCM. HB's under SCM looks more + * like life line protocol than HB's under HDFS. In other words, it is + * extremely light weight and contains no data payload. + */ + rpc sendHeartbeat (SCMHeartbeatRequestProto) returns (SCMHeartbeatResponseProto); + + /** + send container reports sends the container report to SCM. This will + return a null command as response. + */ + rpc sendContainerReport(ContainerReportsRequestProto) returns (ContainerReportsResponseProto); + + /** + * Sends the block deletion ACK to SCM. + */ + rpc sendContainerBlocksDeletionACK (ContainerBlocksDeletionACKProto) returns (ContainerBlocksDeletionACKResponseProto); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/resources/META-INF/services/com.sun.jersey.spi.container.ContainerProvider ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/resources/META-INF/services/com.sun.jersey.spi.container.ContainerProvider b/hadoop-hdds/container-service/src/main/resources/META-INF/services/com.sun.jersey.spi.container.ContainerProvider new file mode 100644 index 0000000..2e103fe --- /dev/null +++ b/hadoop-hdds/container-service/src/main/resources/META-INF/services/com.sun.jersey.spi.container.ContainerProvider @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.hadoop.ozone.web.netty.ObjectStoreJerseyContainerProvider http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java new file mode 100644 index 0000000..9db792b --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.common; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.container.common.statemachine + .EndpointStateMachine; +import org.apache.hadoop.ozone.protocolPB + .StorageContainerDatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB; +import org.apache.hadoop.security.UserGroupInformation; + +import java.net.InetSocketAddress; + +/** + * Helper utility to test containers. + */ +public class ContainerTestUtils { + + private ContainerTestUtils() { + } + + /** + * Creates an Endpoint class for testing purpose. + * + * @param conf - Conf + * @param address - InetAddres + * @param rpcTimeout - rpcTimeOut + * @return EndPoint + * @throws Exception + */ + public static EndpointStateMachine createEndpoint(Configuration conf, + InetSocketAddress address, int rpcTimeout) throws Exception { + RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class, + ProtobufRpcEngine.class); + long version = + RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class); + + StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProtocolProxy( + StorageContainerDatanodeProtocolPB.class, version, + address, UserGroupInformation.getCurrentUser(), conf, + NetUtils.getDefaultSocketFactory(conf), rpcTimeout, + RetryPolicies.TRY_ONCE_THEN_FAIL).getProxy(); + + StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient = + new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy); + return new EndpointStateMachine(address, rpcClient, conf); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java new file mode 100644 index 0000000..b63c5fb --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common; + +import com.google.protobuf.BlockingService; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos + .StorageContainerDatanodeProtocolService; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; +import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB; +import org.apache.hadoop.ozone.protocolPB + .StorageContainerDatanodeProtocolServerSideTranslatorPB; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; + +/** + * Test Endpoint class. + */ +public final class SCMTestUtils { + /** + * Never constructed. + */ + private SCMTestUtils() { + } + + /** + * Starts an RPC server, if configured. + * + * @param conf configuration + * @param addr configured address of RPC server + * @param protocol RPC protocol provided by RPC server + * @param instance RPC protocol implementation instance + * @param handlerCount RPC server handler count + * @return RPC server + * @throws IOException if there is an I/O error while creating RPC server + */ + private static RPC.Server startRpcServer(Configuration conf, + InetSocketAddress addr, Class<?> + protocol, BlockingService instance, int handlerCount) + throws IOException { + RPC.Server rpcServer = new RPC.Builder(conf) + .setProtocol(protocol) + .setInstance(instance) + .setBindAddress(addr.getHostString()) + .setPort(addr.getPort()) + .setNumHandlers(handlerCount) + .setVerbose(false) + .setSecretManager(null) + .build(); + + DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer); + return rpcServer; + } + + + /** + * Start Datanode RPC server. + */ + public static RPC.Server startScmRpcServer(Configuration configuration, + StorageContainerDatanodeProtocol server, + InetSocketAddress rpcServerAddresss, int handlerCount) throws + IOException { + RPC.setProtocolEngine(configuration, + StorageContainerDatanodeProtocolPB.class, + ProtobufRpcEngine.class); + + BlockingService scmDatanodeService = + StorageContainerDatanodeProtocolService. + newReflectiveBlockingService( + new StorageContainerDatanodeProtocolServerSideTranslatorPB( + server)); + + RPC.Server scmServer = startRpcServer(configuration, rpcServerAddresss, + StorageContainerDatanodeProtocolPB.class, scmDatanodeService, + handlerCount); + + scmServer.start(); + return scmServer; + } + + public static InetSocketAddress getReuseableAddress() throws IOException { + try (ServerSocket socket = new ServerSocket(0)) { + socket.setReuseAddress(true); + int port = socket.getLocalPort(); + String addr = InetAddress.getLoopbackAddress().getHostAddress(); + return new InetSocketAddress(addr, port); + } + } + + public static Configuration getConf() { + return new Configuration(); + } + + public static OzoneConfiguration getOzoneConf() { + return new OzoneConfiguration(); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
