http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java new file mode 100644 index 0000000..ac245d5 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.states.datanode; + +import com.google.common.base.Strings; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.HddsUtils; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; +import org.apache.hadoop.ozone.container.common.statemachine + .DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine + .SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.common.states.DatanodeState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.hdds.HddsUtils.getSCMAddresses; + +/** + * Init Datanode State is the task that gets run when we are in Init State. + */ +public class InitDatanodeState implements DatanodeState, + Callable<DatanodeStateMachine.DatanodeStates> { + static final Logger LOG = LoggerFactory.getLogger(InitDatanodeState.class); + private final SCMConnectionManager connectionManager; + private final Configuration conf; + private final StateContext context; + private Future<DatanodeStateMachine.DatanodeStates> result; + + /** + * Create InitDatanodeState Task. + * + * @param conf - Conf + * @param connectionManager - Connection Manager + * @param context - Current Context + */ + public InitDatanodeState(Configuration conf, + SCMConnectionManager connectionManager, + StateContext context) { + this.conf = conf; + this.connectionManager = connectionManager; + this.context = context; + } + + /** + * Computes a result, or throws an exception if unable to do so. + * + * @return computed result + * @throws Exception if unable to compute a result + */ + @Override + public DatanodeStateMachine.DatanodeStates call() throws Exception { + Collection<InetSocketAddress> addresses = null; + try { + addresses = getSCMAddresses(conf); + } catch (IllegalArgumentException e) { + if(!Strings.isNullOrEmpty(e.getMessage())) { + LOG.error("Failed to get SCM addresses: " + e.getMessage()); + } + return DatanodeStateMachine.DatanodeStates.SHUTDOWN; + } + + if (addresses == null || addresses.isEmpty()) { + LOG.error("Null or empty SCM address list found."); + return DatanodeStateMachine.DatanodeStates.SHUTDOWN; + } else { + for (InetSocketAddress addr : addresses) { + connectionManager.addSCMServer(addr); + } + } + + // If datanode ID is set, persist it to the ID file. + persistContainerDatanodeDetails(); + + return this.context.getState().getNextState(); + } + + /** + * Persist DatanodeDetails to datanode.id file. + */ + private void persistContainerDatanodeDetails() throws IOException { + String dataNodeIDPath = HddsUtils.getDatanodeIdFilePath(conf); + File idPath = new File(dataNodeIDPath); + DatanodeDetails datanodeDetails = this.context.getParent() + .getDatanodeDetails(); + if (datanodeDetails != null && !idPath.exists()) { + ContainerUtils.writeDatanodeDetailsTo(datanodeDetails, idPath); + LOG.info("DatanodeDetails is persisted to {}", dataNodeIDPath); + } + } + + /** + * Called before entering this state. + */ + @Override + public void onEnter() { + LOG.trace("Entering init container state"); + } + + /** + * Called After exiting this state. + */ + @Override + public void onExit() { + LOG.trace("Exiting init container state"); + } + + /** + * Executes one or more tasks that is needed by this state. + * + * @param executor - ExecutorService + */ + @Override + public void execute(ExecutorService executor) { + result = executor.submit(this); + } + + /** + * Wait for execute to finish. + * + * @param time - Time + * @param timeUnit - Unit of time. + */ + @Override + public DatanodeStateMachine.DatanodeStates await(long time, + TimeUnit timeUnit) throws InterruptedException, + ExecutionException, TimeoutException { + return result.get(time, timeUnit); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java new file mode 100644 index 0000000..7a8c17b --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.states.datanode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.common.states.DatanodeState; +import org.apache.hadoop.ozone.container.common.states.endpoint.HeartbeatEndpointTask; +import org.apache.hadoop.ozone.container.common.states.endpoint.RegisterEndpointTask; +import org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Class that implements handshake with SCM. + */ +public class RunningDatanodeState implements DatanodeState { + static final Logger + LOG = LoggerFactory.getLogger(RunningDatanodeState.class); + private final SCMConnectionManager connectionManager; + private final Configuration conf; + private final StateContext context; + private CompletionService<EndpointStateMachine.EndPointStates> ecs; + + public RunningDatanodeState(Configuration conf, + SCMConnectionManager connectionManager, + StateContext context) { + this.connectionManager = connectionManager; + this.conf = conf; + this.context = context; + } + + /** + * Called before entering this state. + */ + @Override + public void onEnter() { + LOG.trace("Entering handshake task."); + } + + /** + * Called After exiting this state. + */ + @Override + public void onExit() { + LOG.trace("Exiting handshake task."); + } + + /** + * Executes one or more tasks that is needed by this state. + * + * @param executor - ExecutorService + */ + @Override + public void execute(ExecutorService executor) { + ecs = new ExecutorCompletionService<>(executor); + for (EndpointStateMachine endpoint : connectionManager.getValues()) { + Callable<EndpointStateMachine.EndPointStates> endpointTask + = getEndPointTask(endpoint); + ecs.submit(endpointTask); + } + } + //TODO : Cache some of these tasks instead of creating them + //all the time. + private Callable<EndpointStateMachine.EndPointStates> + getEndPointTask(EndpointStateMachine endpoint) { + switch (endpoint.getState()) { + case GETVERSION: + return new VersionEndpointTask(endpoint, conf); + case REGISTER: + return RegisterEndpointTask.newBuilder() + .setConfig(conf) + .setEndpointStateMachine(endpoint) + .setDatanodeDetails(context.getParent().getDatanodeDetails()) + .build(); + case HEARTBEAT: + return HeartbeatEndpointTask.newBuilder() + .setConfig(conf) + .setEndpointStateMachine(endpoint) + .setDatanodeDetails(context.getParent().getDatanodeDetails()) + .setContext(context) + .build(); + case SHUTDOWN: + break; + default: + throw new IllegalArgumentException("Illegal Argument."); + } + return null; + } + + /** + * Computes the next state the container state machine must move to by looking + * at all the state of endpoints. + * <p> + * if any endpoint state has moved to Shutdown, either we have an + * unrecoverable error or we have been told to shutdown. Either case the + * datanode state machine should move to Shutdown state, otherwise we + * remain in the Running state. + * + * @return next container state. + */ + private DatanodeStateMachine.DatanodeStates + computeNextContainerState( + List<Future<EndpointStateMachine.EndPointStates>> results) { + for (Future<EndpointStateMachine.EndPointStates> state : results) { + try { + if (state.get() == EndpointStateMachine.EndPointStates.SHUTDOWN) { + // if any endpoint tells us to shutdown we move to shutdown state. + return DatanodeStateMachine.DatanodeStates.SHUTDOWN; + } + } catch (InterruptedException | ExecutionException e) { + LOG.error("Error in executing end point task.", e); + } + } + return DatanodeStateMachine.DatanodeStates.RUNNING; + } + + /** + * Wait for execute to finish. + * + * @param duration - Time + * @param timeUnit - Unit of duration. + */ + @Override + public DatanodeStateMachine.DatanodeStates + await(long duration, TimeUnit timeUnit) + throws InterruptedException, ExecutionException, TimeoutException { + int count = connectionManager.getValues().size(); + int returned = 0; + long timeLeft = timeUnit.toMillis(duration); + long startTime = Time.monotonicNow(); + List<Future<EndpointStateMachine.EndPointStates>> results = new + LinkedList<>(); + + while (returned < count && timeLeft > 0) { + Future<EndpointStateMachine.EndPointStates> result = + ecs.poll(timeLeft, TimeUnit.MILLISECONDS); + if (result != null) { + results.add(result); + returned++; + } + timeLeft = timeLeft - (Time.monotonicNow() - startTime); + } + return computeNextContainerState(results); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java new file mode 100644 index 0000000..6b8d16c --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.states.datanode; +/** + This package contians files that guide the state transitions from + Init->Running->Shutdown for the datanode. + */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java new file mode 100644 index 0000000..5dee10f --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.states.endpoint; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; +import org.apache.hadoop.ozone.container.common.helpers + .DeletedContainerBlocksSummary; +import org.apache.hadoop.ozone.container.common.statemachine + .EndpointStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine + .EndpointStateMachine.EndPointStates; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; +import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.ZonedDateTime; +import java.util.concurrent.Callable; + +/** + * Heartbeat class for SCMs. + */ +public class HeartbeatEndpointTask + implements Callable<EndpointStateMachine.EndPointStates> { + static final Logger LOG = + LoggerFactory.getLogger(HeartbeatEndpointTask.class); + private final EndpointStateMachine rpcEndpoint; + private final Configuration conf; + private DatanodeDetailsProto datanodeDetailsProto; + private StateContext context; + + /** + * Constructs a SCM heart beat. + * + * @param conf Config. + */ + public HeartbeatEndpointTask(EndpointStateMachine rpcEndpoint, + Configuration conf, StateContext context) { + this.rpcEndpoint = rpcEndpoint; + this.conf = conf; + this.context = context; + } + + /** + * Get the container Node ID proto. + * + * @return ContainerNodeIDProto + */ + public DatanodeDetailsProto getDatanodeDetailsProto() { + return datanodeDetailsProto; + } + + /** + * Set container node ID proto. + * + * @param datanodeDetailsProto - the node id. + */ + public void setDatanodeDetailsProto(DatanodeDetailsProto + datanodeDetailsProto) { + this.datanodeDetailsProto = datanodeDetailsProto; + } + + /** + * Computes a result, or throws an exception if unable to do so. + * + * @return computed result + * @throws Exception if unable to compute a result + */ + @Override + public EndpointStateMachine.EndPointStates call() throws Exception { + rpcEndpoint.lock(); + try { + Preconditions.checkState(this.datanodeDetailsProto != null); + + SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint() + .sendHeartbeat(datanodeDetailsProto, this.context.getNodeReport(), + this.context.getContainerReportState()); + processResponse(reponse, datanodeDetailsProto); + rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now()); + rpcEndpoint.zeroMissedCount(); + } catch (IOException ex) { + rpcEndpoint.logIfNeeded(ex); + } finally { + rpcEndpoint.unlock(); + } + return rpcEndpoint.getState(); + } + + /** + * Returns a builder class for HeartbeatEndpointTask task. + * @return Builder. + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Add this command to command processing Queue. + * + * @param response - SCMHeartbeat response. + */ + private void processResponse(SCMHeartbeatResponseProto response, + final DatanodeDetailsProto datanodeDetails) { + for (SCMCommandResponseProto commandResponseProto : response + .getCommandsList()) { + // Verify the response is indeed for this datanode. + Preconditions.checkState(commandResponseProto.getDatanodeUUID() + .equalsIgnoreCase(datanodeDetails.getUuid()), + "Unexpected datanode ID in the response."); + switch (commandResponseProto.getCmdType()) { + case sendContainerReport: + this.context.addCommand(SendContainerCommand.getFromProtobuf( + commandResponseProto.getSendReport())); + break; + case reregisterCommand: + if (rpcEndpoint.getState() == EndPointStates.HEARTBEAT) { + if (LOG.isDebugEnabled()) { + LOG.debug("Received SCM notification to register." + + " Interrupt HEARTBEAT and transit to REGISTER state."); + } + rpcEndpoint.setState(EndPointStates.REGISTER); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Illegal state {} found, expecting {}.", + rpcEndpoint.getState().name(), EndPointStates.HEARTBEAT); + } + } + break; + case deleteBlocksCommand: + DeleteBlocksCommand db = DeleteBlocksCommand + .getFromProtobuf(commandResponseProto.getDeleteBlocksProto()); + if (!db.blocksTobeDeleted().isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug(DeletedContainerBlocksSummary + .getFrom(db.blocksTobeDeleted()) + .toString()); + } + this.context.addCommand(db); + } + break; + case closeContainerCommand: + CloseContainerCommand closeContainer = + CloseContainerCommand.getFromProtobuf( + commandResponseProto.getCloseContainerProto()); + if (LOG.isDebugEnabled()) { + LOG.debug("Received SCM container close request for container {}", + closeContainer.getContainerName()); + } + this.context.addCommand(closeContainer); + break; + default: + throw new IllegalArgumentException("Unknown response : " + + commandResponseProto.getCmdType().name()); + } + } + } + + /** + * Builder class for HeartbeatEndpointTask. + */ + public static class Builder { + private EndpointStateMachine endPointStateMachine; + private Configuration conf; + private DatanodeDetails datanodeDetails; + private StateContext context; + + /** + * Constructs the builder class. + */ + public Builder() { + } + + /** + * Sets the endpoint state machine. + * + * @param rpcEndPoint - Endpoint state machine. + * @return Builder + */ + public Builder setEndpointStateMachine(EndpointStateMachine rpcEndPoint) { + this.endPointStateMachine = rpcEndPoint; + return this; + } + + /** + * Sets the Config. + * + * @param config - config + * @return Builder + */ + public Builder setConfig(Configuration config) { + this.conf = config; + return this; + } + + /** + * Sets the NodeID. + * + * @param dnDetails - NodeID proto + * @return Builder + */ + public Builder setDatanodeDetails(DatanodeDetails dnDetails) { + this.datanodeDetails = dnDetails; + return this; + } + + /** + * Sets the context. + * @param stateContext - State context. + * @return this. + */ + public Builder setContext(StateContext stateContext) { + this.context = stateContext; + return this; + } + + public HeartbeatEndpointTask build() { + if (endPointStateMachine == null) { + LOG.error("No endpoint specified."); + throw new IllegalArgumentException("A valid endpoint state machine is" + + " needed to construct HeartbeatEndpointTask task"); + } + + if (conf == null) { + LOG.error("No config specified."); + throw new IllegalArgumentException("A valid configration is needed to" + + " construct HeartbeatEndpointTask task"); + } + + if (datanodeDetails == null) { + LOG.error("No datanode specified."); + throw new IllegalArgumentException("A vaild Node ID is needed to " + + "construct HeartbeatEndpointTask task"); + } + + HeartbeatEndpointTask task = new HeartbeatEndpointTask(this + .endPointStateMachine, this.conf, this.context); + task.setDatanodeDetailsProto(datanodeDetails.getProtoBufMessage()); + return task; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java new file mode 100644 index 0000000..6913896 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.states.endpoint; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; +import org.apache.hadoop.ozone.container.common.statemachine + .EndpointStateMachine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; + +/** + * Register a container with SCM. + */ +public final class RegisterEndpointTask implements + Callable<EndpointStateMachine.EndPointStates> { + static final Logger LOG = LoggerFactory.getLogger(RegisterEndpointTask.class); + + private final EndpointStateMachine rpcEndPoint; + private final Configuration conf; + private Future<EndpointStateMachine.EndPointStates> result; + private DatanodeDetailsProto datanodeDetailsProto; + + /** + * Creates a register endpoint task. + * + * @param rpcEndPoint - endpoint + * @param conf - conf + */ + @VisibleForTesting + public RegisterEndpointTask(EndpointStateMachine rpcEndPoint, + Configuration conf) { + this.rpcEndPoint = rpcEndPoint; + this.conf = conf; + + } + + /** + * Get the DatanodeDetailsProto Proto. + * + * @return DatanodeDetailsProto + */ + public DatanodeDetailsProto getDatanodeDetailsProto() { + return datanodeDetailsProto; + } + + /** + * Set the contiainerNodeID Proto. + * + * @param datanodeDetailsProto - Container Node ID. + */ + public void setDatanodeDetailsProto( + DatanodeDetailsProto datanodeDetailsProto) { + this.datanodeDetailsProto = datanodeDetailsProto; + } + + /** + * Computes a result, or throws an exception if unable to do so. + * + * @return computed result + * @throws Exception if unable to compute a result + */ + @Override + public EndpointStateMachine.EndPointStates call() throws Exception { + + if (getDatanodeDetailsProto() == null) { + LOG.error("Container ID proto cannot be null in RegisterEndpoint task, " + + "shutting down the endpoint."); + return rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN); + } + + rpcEndPoint.lock(); + try { + + // TODO : Add responses to the command Queue. + rpcEndPoint.getEndPoint().register(datanodeDetailsProto, + conf.getStrings(ScmConfigKeys.OZONE_SCM_NAMES)); + EndpointStateMachine.EndPointStates nextState = + rpcEndPoint.getState().getNextState(); + rpcEndPoint.setState(nextState); + rpcEndPoint.zeroMissedCount(); + } catch (IOException ex) { + rpcEndPoint.logIfNeeded(ex + ); + } finally { + rpcEndPoint.unlock(); + } + + return rpcEndPoint.getState(); + } + + /** + * Returns a builder class for RegisterEndPoint task. + * + * @return Builder. + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder class for RegisterEndPoint task. + */ + public static class Builder { + private EndpointStateMachine endPointStateMachine; + private Configuration conf; + private DatanodeDetails datanodeDetails; + + /** + * Constructs the builder class. + */ + public Builder() { + } + + /** + * Sets the endpoint state machine. + * + * @param rpcEndPoint - Endpoint state machine. + * @return Builder + */ + public Builder setEndpointStateMachine(EndpointStateMachine rpcEndPoint) { + this.endPointStateMachine = rpcEndPoint; + return this; + } + + /** + * Sets the Config. + * + * @param config - config + * @return Builder. + */ + public Builder setConfig(Configuration config) { + this.conf = config; + return this; + } + + /** + * Sets the NodeID. + * + * @param dnDetails - NodeID proto + * @return Builder + */ + public Builder setDatanodeDetails(DatanodeDetails dnDetails) { + this.datanodeDetails = dnDetails; + return this; + } + + public RegisterEndpointTask build() { + if (endPointStateMachine == null) { + LOG.error("No endpoint specified."); + throw new IllegalArgumentException("A valid endpoint state machine is" + + " needed to construct RegisterEndPoint task"); + } + + if (conf == null) { + LOG.error("No config specified."); + throw new IllegalArgumentException("A valid configration is needed to" + + " construct RegisterEndpoint task"); + } + + if (datanodeDetails == null) { + LOG.error("No datanode specified."); + throw new IllegalArgumentException("A vaild Node ID is needed to " + + "construct RegisterEndpoint task"); + } + + RegisterEndpointTask task = new RegisterEndpointTask(this + .endPointStateMachine, this.conf); + task.setDatanodeDetailsProto(datanodeDetails.getProtoBufMessage()); + return task; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java new file mode 100644 index 0000000..b048ee5 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.states.endpoint; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; +import org.apache.hadoop.ozone.container.common.statemachine + .EndpointStateMachine; +import org.apache.hadoop.ozone.protocol.VersionResponse; + +import java.io.IOException; +import java.util.concurrent.Callable; + +/** + * Task that returns version. + */ +public class VersionEndpointTask implements + Callable<EndpointStateMachine.EndPointStates> { + private final EndpointStateMachine rpcEndPoint; + private final Configuration configuration; + + public VersionEndpointTask(EndpointStateMachine rpcEndPoint, + Configuration conf) { + this.rpcEndPoint = rpcEndPoint; + this.configuration = conf; + } + + /** + * Computes a result, or throws an exception if unable to do so. + * + * @return computed result + * @throws Exception if unable to compute a result + */ + @Override + public EndpointStateMachine.EndPointStates call() throws Exception { + rpcEndPoint.lock(); + try{ + SCMVersionResponseProto versionResponse = + rpcEndPoint.getEndPoint().getVersion(null); + rpcEndPoint.setVersion(VersionResponse.getFromProtobuf(versionResponse)); + + EndpointStateMachine.EndPointStates nextState = + rpcEndPoint.getState().getNextState(); + rpcEndPoint.setState(nextState); + rpcEndPoint.zeroMissedCount(); + } catch (IOException ex) { + rpcEndPoint.logIfNeeded(ex); + } finally { + rpcEndPoint.unlock(); + } + return rpcEndPoint.getState(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java new file mode 100644 index 0000000..1122598 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.states.endpoint; +/** + This package contains code for RPC endpoints transitions. + */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java new file mode 100644 index 0000000..92c953f --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.states; http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java new file mode 100644 index 0000000..50e45b4 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server; + +import com.google.common.base.Preconditions; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.SocketAddress; + +/** + * Creates a netty server endpoint that acts as the communication layer for + * Ozone containers. + */ +public final class XceiverServer implements XceiverServerSpi { + private static final Logger + LOG = LoggerFactory.getLogger(XceiverServer.class); + private int port; + private final ContainerDispatcher storageContainer; + + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + private Channel channel; + + /** + * Constructs a netty server class. + * + * @param conf - Configuration + */ + public XceiverServer(DatanodeDetails datanodeDetails, Configuration conf, + ContainerDispatcher dispatcher) { + Preconditions.checkNotNull(conf); + + this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, + OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); + // Get an available port on current node and + // use that as the container port + if (conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, + OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT)) { + try (ServerSocket socket = new ServerSocket()) { + socket.setReuseAddress(true); + SocketAddress address = new InetSocketAddress(0); + socket.bind(address); + this.port = socket.getLocalPort(); + LOG.info("Found a free port for the server : {}", this.port); + } catch (IOException e) { + LOG.error("Unable find a random free port for the server, " + + "fallback to use default port {}", this.port, e); + } + } + datanodeDetails.setContainerPort(port); + this.storageContainer = dispatcher; + } + + @Override + public int getIPCPort() { + return this.port; + } + + /** + * Returns the Replication type supported by this end-point. + * + * @return enum -- {Stand_Alone, Ratis, Chained} + */ + @Override + public HddsProtos.ReplicationType getServerType() { + return HddsProtos.ReplicationType.STAND_ALONE; + } + + @Override + public void start() throws IOException { + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(); + channel = new ServerBootstrap() + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new XceiverServerInitializer(storageContainer)) + .bind(port) + .syncUninterruptibly() + .channel(); + } + + @Override + public void stop() { + if (storageContainer != null) { + storageContainer.shutdown(); + } + if (bossGroup != null) { + bossGroup.shutdownGracefully(); + } + if (workerGroup != null) { + workerGroup.shutdownGracefully(); + } + if (channel != null) { + channel.close().awaitUninterruptibly(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java new file mode 100644 index 0000000..5947dde --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos + .ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos + .ContainerCommandResponseProto; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Netty server handlers that respond to Network events. + */ +public class XceiverServerHandler extends + SimpleChannelInboundHandler<ContainerCommandRequestProto> { + + static final Logger LOG = LoggerFactory.getLogger(XceiverServerHandler.class); + private final ContainerDispatcher dispatcher; + + /** + * Constructor for server handler. + * @param dispatcher - Dispatcher interface + */ + public XceiverServerHandler(ContainerDispatcher dispatcher) { + this.dispatcher = dispatcher; + } + + /** + * <strong>Please keep in mind that this method will be renamed to {@code + * messageReceived(ChannelHandlerContext, I)} in 5.0.</strong> + * <p> + * Is called for each message of type {@link ContainerCommandRequestProto}. + * + * @param ctx the {@link ChannelHandlerContext} which this {@link + * SimpleChannelInboundHandler} belongs to + * @param msg the message to handle + * @throws Exception is thrown if an error occurred + */ + @Override + public void channelRead0(ChannelHandlerContext ctx, + ContainerCommandRequestProto msg) throws + Exception { + ContainerCommandResponseProto response = this.dispatcher.dispatch(msg); + LOG.debug("Writing the reponse back to client."); + ctx.writeAndFlush(response); + + } + + /** + * Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)} + * Sub-classes may override this method to change behavior. + * + * @param ctx - Channel Handler Context + * @param cause - Exception + */ + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) + throws Exception { + LOG.error("An exception caught in the pipeline : " + cause.toString()); + super.exceptionCaught(ctx, cause); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java new file mode 100644 index 0000000..78ba26b --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server; + +import com.google.common.base.Preconditions; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.protobuf.ProtobufDecoder; +import io.netty.handler.codec.protobuf.ProtobufEncoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos + .ContainerCommandRequestProto; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; + +/** + * Creates a channel for the XceiverServer. + */ +public class XceiverServerInitializer extends ChannelInitializer<SocketChannel>{ + private final ContainerDispatcher dispatcher; + public XceiverServerInitializer(ContainerDispatcher dispatcher) { + Preconditions.checkNotNull(dispatcher); + this.dispatcher = dispatcher; + } + + /** + * This method will be called once the Channel is registered. After + * the method returns this instance will be removed from the {@link + * ChannelPipeline} + * + * @param ch the which was registered. + * @throws Exception is thrown if an error occurs. In that case the channel + * will be closed. + */ + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new ProtobufVarint32FrameDecoder()); + pipeline.addLast(new ProtobufDecoder(ContainerCommandRequestProto + .getDefaultInstance())); + pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); + pipeline.addLast(new ProtobufEncoder()); + pipeline.addLast(new XceiverServerHandler(dispatcher)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java new file mode 100644 index 0000000..dad9e9f --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server; + +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; + +import java.io.IOException; + +/** A server endpoint that acts as the communication layer for Ozone + * containers. */ +public interface XceiverServerSpi { + /** Starts the server. */ + void start() throws IOException; + + /** Stops a running server. */ + void stop(); + + /** Get server IPC port. */ + int getIPCPort(); + + /** + * Returns the Replication type supported by this end-point. + * @return enum -- {Stand_Alone, Ratis, Chained} + */ + HddsProtos.ReplicationType getServerType(); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java new file mode 100644 index 0000000..59c96f1 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server; + +/** + * This package contains classes for the server of the storage container + * protocol. + */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java new file mode 100644 index 0000000..1a89e44 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server.ratis; + +import com.google.common.base.Preconditions; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos + .ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos + .ContainerCommandResponseProto; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos + .WriteChunkRequestProto; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; +import org.apache.ratis.statemachine.StateMachineStorage; +import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.statemachine.impl.BaseStateMachine; +import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; +import org.apache.ratis.statemachine.impl.TransactionContextImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadPoolExecutor; + +/** A {@link org.apache.ratis.statemachine.StateMachine} for containers. + * + * The stateMachine is responsible for handling different types of container + * requests. The container requests can be divided into readonly and write + * requests. + * + * Read only requests are classified in + * {@link org.apache.hadoop.hdds.scm.XceiverClientRatis#isReadOnly} + * and these readonly requests are replied from the {@link #query(Message)}. + * + * The write requests can be divided into requests with user data + * (WriteChunkRequest) and other request without user data. + * + * Inorder to optimize the write throughput, the writeChunk request is + * processed in 2 phases. The 2 phases are divided in + * {@link #startTransaction(RaftClientRequest)}, in the first phase the user + * data is written directly into the state machine via + * {@link #writeStateMachineData} and in the second phase the + * transaction is committed via {@link #applyTransaction(TransactionContext)} + * + * For the requests with no stateMachine data, the transaction is directly + * committed through + * {@link #applyTransaction(TransactionContext)} + * + * There are 2 ordering operation which are enforced right now in the code, + * 1) Write chunk operation are executed after the create container operation, + * the write chunk operation will fail otherwise as the container still hasn't + * been created. Hence the create container operation has been split in the + * {@link #startTransaction(RaftClientRequest)}, this will help in synchronizing + * the calls in {@link #writeStateMachineData} + * + * 2) Write chunk commit operation is executed after write chunk state machine + * operation. This will ensure that commit operation is sync'd with the state + * machine operation. + * */ +public class ContainerStateMachine extends BaseStateMachine { + static final Logger LOG = LoggerFactory.getLogger( + ContainerStateMachine.class); + private final SimpleStateMachineStorage storage + = new SimpleStateMachineStorage(); + private final ContainerDispatcher dispatcher; + private ThreadPoolExecutor writeChunkExecutor; + private final ConcurrentHashMap<Long, CompletableFuture<Message>> + writeChunkFutureMap; + private final ConcurrentHashMap<String, CompletableFuture<Message>> + createContainerFutureMap; + + ContainerStateMachine(ContainerDispatcher dispatcher, + ThreadPoolExecutor writeChunkExecutor) { + this.dispatcher = dispatcher; + this.writeChunkExecutor = writeChunkExecutor; + this.writeChunkFutureMap = new ConcurrentHashMap<>(); + this.createContainerFutureMap = new ConcurrentHashMap<>(); + } + + @Override + public StateMachineStorage getStateMachineStorage() { + return storage; + } + + @Override + public void initialize( + RaftPeerId id, RaftProperties properties, RaftStorage raftStorage) + throws IOException { + super.initialize(id, properties, raftStorage); + storage.init(raftStorage); + // TODO handle snapshots + + // TODO: Add a flag that tells you that initialize has been called. + // Check with Ratis if this feature is done in Ratis. + } + + @Override + public TransactionContext startTransaction(RaftClientRequest request) + throws IOException { + final ContainerCommandRequestProto proto = + getRequestProto(request.getMessage().getContent()); + + final SMLogEntryProto log; + if (proto.getCmdType() == ContainerProtos.Type.WriteChunk) { + final WriteChunkRequestProto write = proto.getWriteChunk(); + // create the state machine data proto + final WriteChunkRequestProto dataWriteChunkProto = + WriteChunkRequestProto + .newBuilder(write) + .setStage(ContainerProtos.Stage.WRITE_DATA) + .build(); + ContainerCommandRequestProto dataContainerCommandProto = + ContainerCommandRequestProto + .newBuilder(proto) + .setWriteChunk(dataWriteChunkProto) + .build(); + + // create the log entry proto + final WriteChunkRequestProto commitWriteChunkProto = + WriteChunkRequestProto.newBuilder() + .setPipeline(write.getPipeline()) + .setKeyName(write.getKeyName()) + .setChunkData(write.getChunkData()) + // skipping the data field as it is + // already set in statemachine data proto + .setStage(ContainerProtos.Stage.COMMIT_DATA) + .build(); + ContainerCommandRequestProto commitContainerCommandProto = + ContainerCommandRequestProto + .newBuilder(proto) + .setWriteChunk(commitWriteChunkProto) + .build(); + + log = SMLogEntryProto.newBuilder() + .setData(getShadedByteString(commitContainerCommandProto)) + .setStateMachineData(getShadedByteString(dataContainerCommandProto)) + .build(); + } else if (proto.getCmdType() == ContainerProtos.Type.CreateContainer) { + log = SMLogEntryProto.newBuilder() + .setData(request.getMessage().getContent()) + .setStateMachineData(request.getMessage().getContent()) + .build(); + } else { + log = SMLogEntryProto.newBuilder() + .setData(request.getMessage().getContent()) + .build(); + } + return new TransactionContextImpl(this, request, log); + } + + private ByteString getShadedByteString(ContainerCommandRequestProto proto) { + return ShadedProtoUtil.asShadedByteString(proto.toByteArray()); + } + + private ContainerCommandRequestProto getRequestProto(ByteString request) + throws InvalidProtocolBufferException { + return ContainerCommandRequestProto.parseFrom( + ShadedProtoUtil.asByteString(request)); + } + + private Message runCommand(ContainerCommandRequestProto requestProto) { + LOG.trace("dispatch {}", requestProto); + ContainerCommandResponseProto response = dispatcher.dispatch(requestProto); + LOG.trace("response {}", response); + return () -> ShadedProtoUtil.asShadedByteString(response.toByteArray()); + } + + private CompletableFuture<Message> handleWriteChunk( + ContainerCommandRequestProto requestProto, long entryIndex) { + final WriteChunkRequestProto write = requestProto.getWriteChunk(); + String containerName = write.getPipeline().getContainerName(); + CompletableFuture<Message> future = + createContainerFutureMap.get(containerName); + CompletableFuture<Message> writeChunkFuture; + if (future != null) { + writeChunkFuture = future.thenApplyAsync( + v -> runCommand(requestProto), writeChunkExecutor); + } else { + writeChunkFuture = CompletableFuture.supplyAsync( + () -> runCommand(requestProto), writeChunkExecutor); + } + writeChunkFutureMap.put(entryIndex, writeChunkFuture); + return writeChunkFuture; + } + + private CompletableFuture<Message> handleCreateContainer( + ContainerCommandRequestProto requestProto) { + String containerName = + requestProto.getCreateContainer().getContainerData().getName(); + createContainerFutureMap. + computeIfAbsent(containerName, k -> new CompletableFuture<>()); + return CompletableFuture.completedFuture(() -> ByteString.EMPTY); + } + + @Override + public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) { + try { + final ContainerCommandRequestProto requestProto = + getRequestProto(entry.getSmLogEntry().getStateMachineData()); + ContainerProtos.Type cmdType = requestProto.getCmdType(); + switch (cmdType) { + case CreateContainer: + return handleCreateContainer(requestProto); + case WriteChunk: + return handleWriteChunk(requestProto, entry.getIndex()); + default: + throw new IllegalStateException("Cmd Type:" + cmdType + + " should not have state machine data"); + } + } catch (IOException e) { + return completeExceptionally(e); + } + } + + @Override + public CompletableFuture<Message> query(Message request) { + try { + final ContainerCommandRequestProto requestProto = + getRequestProto(request.getContent()); + return CompletableFuture.completedFuture(runCommand(requestProto)); + } catch (IOException e) { + return completeExceptionally(e); + } + } + + @Override + public CompletableFuture<Message> applyTransaction(TransactionContext trx) { + try { + ContainerCommandRequestProto requestProto = + getRequestProto(trx.getSMLogEntry().getData()); + ContainerProtos.Type cmdType = requestProto.getCmdType(); + + if (cmdType == ContainerProtos.Type.WriteChunk) { + WriteChunkRequestProto write = requestProto.getWriteChunk(); + // the data field has already been removed in start Transaction + Preconditions.checkArgument(!write.hasData()); + CompletableFuture<Message> stateMachineFuture = + writeChunkFutureMap.remove(trx.getLogEntry().getIndex()); + return stateMachineFuture + .thenComposeAsync(v -> + CompletableFuture.completedFuture(runCommand(requestProto))); + } else { + Message message = runCommand(requestProto); + if (cmdType == ContainerProtos.Type.CreateContainer) { + String containerName = + requestProto.getCreateContainer().getContainerData().getName(); + createContainerFutureMap.remove(containerName).complete(message); + } + return CompletableFuture.completedFuture(message); + } + } catch (IOException e) { + return completeExceptionally(e); + } + } + + private static <T> CompletableFuture<T> completeExceptionally(Exception e) { + final CompletableFuture<T> future = new CompletableFuture<>(); + future.completeExceptionally(e); + return future; + } + + @Override + public void close() throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java new file mode 100644 index 0000000..4bd55f1 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server.ratis; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.hadoop.ozone.container.common.transport.server + .XceiverServerSpi; +import org.apache.ratis.RaftConfigKeys; +import org.apache.ratis.RatisHelper; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.netty.NettyConfigKeys; +import org.apache.ratis.rpc.RpcType; +import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.util.SizeInBytes; +import org.apache.ratis.util.TimeDuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.SocketAddress; +import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Creates a ratis server endpoint that acts as the communication layer for + * Ozone containers. + */ +public final class XceiverServerRatis implements XceiverServerSpi { + static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class); + private final int port; + private final RaftServer server; + private ThreadPoolExecutor writeChunkExecutor; + + private XceiverServerRatis(DatanodeDetails dd, int port, String storageDir, + ContainerDispatcher dispatcher, Configuration conf) throws IOException { + + final String rpcType = conf.get( + OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, + OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); + final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType); + final int raftSegmentSize = conf.getInt( + OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY, + OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT); + final int raftSegmentPreallocatedSize = conf.getInt( + OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY, + OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT); + final int maxChunkSize = OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE; + final int numWriteChunkThreads = conf.getInt( + OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY, + OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT); + + Objects.requireNonNull(dd, "id == null"); + this.port = port; + RaftProperties serverProperties = newRaftProperties(rpc, port, + storageDir, maxChunkSize, raftSegmentSize, raftSegmentPreallocatedSize); + + writeChunkExecutor = + new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads, + 100, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(1024), + new ThreadPoolExecutor.CallerRunsPolicy()); + ContainerStateMachine stateMachine = + new ContainerStateMachine(dispatcher, writeChunkExecutor); + this.server = RaftServer.newBuilder() + .setServerId(RatisHelper.toRaftPeerId(dd)) + .setGroup(RatisHelper.emptyRaftGroup()) + .setProperties(serverProperties) + .setStateMachine(stateMachine) + .build(); + } + + private static RaftProperties newRaftProperties( + RpcType rpc, int port, String storageDir, int scmChunkSize, + int raftSegmentSize, int raftSegmentPreallocatedSize) { + final RaftProperties properties = new RaftProperties(); + RaftServerConfigKeys.Log.Appender.setBatchEnabled(properties, true); + RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties, + SizeInBytes.valueOf(raftSegmentPreallocatedSize)); + RaftServerConfigKeys.Log.setWriteBufferSize(properties, + SizeInBytes.valueOf(scmChunkSize)); + RaftServerConfigKeys.Log.setPreallocatedSize(properties, + SizeInBytes.valueOf(raftSegmentPreallocatedSize)); + RaftServerConfigKeys.Log.setSegmentSizeMax(properties, + SizeInBytes.valueOf(raftSegmentSize)); + RaftServerConfigKeys.setStorageDir(properties, new File(storageDir)); + RaftConfigKeys.Rpc.setType(properties, rpc); + + RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2); + GrpcConfigKeys.setMessageSizeMax(properties, + SizeInBytes.valueOf(scmChunkSize + raftSegmentPreallocatedSize)); + RaftServerConfigKeys.Rpc.setTimeoutMin(properties, + TimeDuration.valueOf(800, TimeUnit.MILLISECONDS)); + RaftServerConfigKeys.Rpc.setTimeoutMax(properties, + TimeDuration.valueOf(1000, TimeUnit.MILLISECONDS)); + if (rpc == SupportedRpcType.GRPC) { + GrpcConfigKeys.Server.setPort(properties, port); + } else if (rpc == SupportedRpcType.NETTY) { + NettyConfigKeys.Server.setPort(properties, port); + } + return properties; + } + + public static XceiverServerRatis newXceiverServerRatis( + DatanodeDetails datanodeDetails, Configuration ozoneConf, + ContainerDispatcher dispatcher) throws IOException { + final String ratisDir = File.separator + "ratis"; + int localPort = ozoneConf.getInt( + OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT, + OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT_DEFAULT); + String storageDir = ozoneConf.get( + OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR); + + if (Strings.isNullOrEmpty(storageDir)) { + storageDir = ozoneConf.get(OzoneConfigKeys + .OZONE_METADATA_DIRS); + Preconditions.checkNotNull(storageDir, "ozone.metadata.dirs " + + "cannot be null, Please check your configs."); + storageDir = storageDir.concat(ratisDir); + LOG.warn("Storage directory for Ratis is not configured. Mapping Ratis " + + "storage under {}. It is a good idea to map this to an SSD disk.", + storageDir); + } + + // Get an available port on current node and + // use that as the container port + if (ozoneConf.getBoolean(OzoneConfigKeys + .DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, + OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT_DEFAULT)) { + try (ServerSocket socket = new ServerSocket()) { + socket.setReuseAddress(true); + SocketAddress address = new InetSocketAddress(0); + socket.bind(address); + localPort = socket.getLocalPort(); + LOG.info("Found a free port for the server : {}", localPort); + // If we have random local ports configured this means that it + // probably running under MiniOzoneCluster. Ratis locks the storage + // directories, so we need to pass different local directory for each + // local instance. So we map ratis directories under datanode ID. + storageDir = + storageDir.concat(File.separator + + datanodeDetails.getUuidString()); + } catch (IOException e) { + LOG.error("Unable find a random free port for the server, " + + "fallback to use default port {}", localPort, e); + } + } + datanodeDetails.setRatisPort(localPort); + return new XceiverServerRatis(datanodeDetails, localPort, storageDir, + dispatcher, ozoneConf); + } + + @Override + public void start() throws IOException { + LOG.info("Starting {} {} at port {}", getClass().getSimpleName(), + server.getId(), getIPCPort()); + writeChunkExecutor.prestartAllCoreThreads(); + server.start(); + } + + @Override + public void stop() { + try { + writeChunkExecutor.shutdown(); + server.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public int getIPCPort() { + return port; + } + + /** + * Returns the Replication type supported by this end-point. + * + * @return enum -- {Stand_Alone, Ratis, Chained} + */ + @Override + public HddsProtos.ReplicationType getServerType() { + return HddsProtos.ReplicationType.RATIS; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java new file mode 100644 index 0000000..8debfe0 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.transport.server.ratis; + +/** + * This package contains classes for the server implementation + * using Apache Ratis + */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java new file mode 100644 index 0000000..6ae45b6 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.utils; + +import com.google.common.base.Preconditions; +import org.apache.commons.collections.MapIterator; +import org.apache.commons.collections.map.LRUMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.utils.MetadataStoreBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * container cache is a LRUMap that maintains the DB handles. + */ +public final class ContainerCache extends LRUMap { + private static final Logger LOG = + LoggerFactory.getLogger(ContainerCache.class); + private final Lock lock = new ReentrantLock(); + private static ContainerCache cache; + private static final float LOAD_FACTOR = 0.75f; + /** + * Constructs a cache that holds DBHandle references. + */ + private ContainerCache(int maxSize, float loadFactor, boolean + scanUntilRemovable) { + super(maxSize, loadFactor, scanUntilRemovable); + } + + /** + * Return a singleton instance of {@link ContainerCache} + * that holds the DB handlers. + * + * @param conf - Configuration. + * @return A instance of {@link ContainerCache}. + */ + public synchronized static ContainerCache getInstance(Configuration conf) { + if (cache == null) { + int cacheSize = conf.getInt(OzoneConfigKeys.OZONE_CONTAINER_CACHE_SIZE, + OzoneConfigKeys.OZONE_CONTAINER_CACHE_DEFAULT); + cache = new ContainerCache(cacheSize, LOAD_FACTOR, true); + } + return cache; + } + + /** + * Closes a db instance. + * + * @param container - name of the container to be closed. + * @param db - db instance to close. + */ + private void closeDB(String container, MetadataStore db) { + if (db != null) { + try { + db.close(); + } catch (IOException e) { + LOG.error("Error closing DB. Container: " + container, e); + } + } + } + + /** + * Closes all the db instances and resets the cache. + */ + public void shutdownCache() { + lock.lock(); + try { + // iterate the cache and close each db + MapIterator iterator = cache.mapIterator(); + while (iterator.hasNext()) { + iterator.next(); + MetadataStore db = (MetadataStore) iterator.getValue(); + closeDB(iterator.getKey().toString(), db); + } + // reset the cache + cache.clear(); + } finally { + lock.unlock(); + } + } + + /** + * {@inheritDoc} + */ + @Override + protected boolean removeLRU(LinkEntry entry) { + lock.lock(); + try { + MetadataStore db = (MetadataStore) entry.getValue(); + closeDB(entry.getKey().toString(), db); + } finally { + lock.unlock(); + } + return true; + } + + /** + * Returns a DB handle if available, create the handler otherwise. + * + * @param containerName - Name of the container. + * @return MetadataStore. + */ + public MetadataStore getDB(String containerName, String containerDBPath) + throws IOException { + Preconditions.checkNotNull(containerName); + Preconditions.checkState(!containerName.isEmpty()); + lock.lock(); + try { + MetadataStore db = (MetadataStore) this.get(containerName); + + if (db == null) { + db = MetadataStoreBuilder.newBuilder() + .setDbFile(new File(containerDBPath)) + .setCreateIfMissing(false) + .build(); + this.put(containerName, db); + } + return db; + } catch (Exception e) { + LOG.error("Error opening DB. Container:{} ContainerPath:{}", + containerName, containerDBPath, e); + throw e; + } finally { + lock.unlock(); + } + } + + /** + * Remove a DB handler from cache. + * + * @param containerName - Name of the container. + */ + public void removeDB(String containerName) { + Preconditions.checkNotNull(containerName); + Preconditions.checkState(!containerName.isEmpty()); + lock.lock(); + try { + MetadataStore db = (MetadataStore)this.get(containerName); + closeDB(containerName, db); + this.remove(containerName); + } finally { + lock.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/package-info.java new file mode 100644 index 0000000..08264f0 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.utils; \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org