This is an automated email from the ASF dual-hosted git repository. licheng pushed a commit to branch HDDS-2823 in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit 1f3ef369755a4d2363506cef545c88c144f14fc7 Author: Nandakumar <na...@apache.org> AuthorDate: Tue May 26 13:26:16 2020 +0530 HDDS-3186. Introduce generic SCMRatisRequest and SCMRatisResponse. (#959) * HDDS-3186. Initial version. * HDDS-3186. Additional changes. --- .../server-scm/dev-support/findbugsExcludeFile.xml | 21 ++ hadoop-hdds/server-scm/pom.xml | 38 ++ .../hdds/scm/container/ContainerManagerImpl.java | 282 +++++++++++++++ .../hdds/scm/container/ContainerManagerV2.java | 189 ++++++++++ .../scm/container/ContainerStateManagerImpl.java | 397 +++++++++++++++++++++ .../scm/container/ContainerStateManagerV2.java | 75 ++++ .../org/apache/hadoop/hdds/scm/ha/RatisUtil.java | 155 ++++++++ .../apache/hadoop/hdds/scm/ha/ReflectionUtil.java | 67 ++++ .../hadoop/hdds/scm/ha/SCMHAConfiguration.java | 225 ++++++++++++ .../hadoop/hdds/scm/ha/SCMHAInvocationHandler.java | 93 +++++ .../apache/hadoop/hdds/scm/ha/SCMHAManager.java | 76 ++++ .../apache/hadoop/hdds/scm/ha/SCMRatisRequest.java | 147 ++++++++ .../hadoop/hdds/scm/ha/SCMRatisResponse.java | 127 +++++++ .../apache/hadoop/hdds/scm/ha/SCMRatisServer.java | 109 ++++++ .../apache/hadoop/hdds/scm/ha/SCMStateMachine.java | 92 +++++ .../apache/hadoop/hdds/scm/metadata/Replicate.java | 33 ++ .../src/main/proto/SCMRatisProtocol.proto | 46 +++ 17 files changed, 2172 insertions(+) diff --git a/hadoop-hdds/server-scm/dev-support/findbugsExcludeFile.xml b/hadoop-hdds/server-scm/dev-support/findbugsExcludeFile.xml new file mode 100644 index 0000000..3571a89 --- /dev/null +++ b/hadoop-hdds/server-scm/dev-support/findbugsExcludeFile.xml @@ -0,0 +1,21 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<FindBugsFilter> + <Match> + <Package name="org.apache.hadoop.hdds.protocol.proto"/> + </Match> +</FindBugsFilter> diff --git a/hadoop-hdds/server-scm/pom.xml b/hadoop-hdds/server-scm/pom.xml index dcbc42a..8c17aae 100644 --- a/hadoop-hdds/server-scm/pom.xml +++ b/hadoop-hdds/server-scm/pom.xml @@ -128,6 +128,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> <artifactId>hadoop-hdds-hadoop-dependency-test</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <scope>compile</scope> + </dependency> </dependencies> <build> <plugins> @@ -163,6 +168,39 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> </execution> </executions> </plugin> + <plugin> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-maven-plugins</artifactId> + <executions> + <execution> + <id>compile-protoc</id> + <goals> + <goal>protoc</goal> + </goals> + <configuration> + <protocVersion>${protobuf.version}</protocVersion> + <imports> + <param> + ${basedir}/src/main/proto + </param> + </imports> + <source> + <directory>${basedir}/src/main/proto</directory> + <includes> + <include>SCMRatisProtocol.proto</include> + </includes> + </source> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>com.github.spotbugs</groupId> + <artifactId>spotbugs-maven-plugin</artifactId> + <configuration> + <excludeFilterFile>${basedir}/dev-support/findbugsExcludeFile.xml</excludeFilterFile> + </configuration> + </plugin> </plugins> <testResources> <testResource> diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java new file mode 100644 index 0000000..0404530 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.ha.SCMHAManager; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TODO: Add javadoc. + */ +public class ContainerManagerImpl implements ContainerManagerV2 { + + /** + * + */ + private static final Logger LOG = LoggerFactory.getLogger( + ContainerManagerImpl.class); + + /** + * + */ + private final ReadWriteLock lock; + + /** + * + */ + private final PipelineManager pipelineManager; + + /** + * + */ + private final ContainerStateManagerV2 containerStateManager; + + /** + * + */ + public ContainerManagerImpl( + // Introduce builder for this class? + final Configuration conf, final PipelineManager pipelineManager, + final SCMHAManager scmhaManager, + final Table<ContainerID, ContainerInfo> containerStore) + throws IOException { + this.lock = new ReentrantReadWriteLock(); + this.pipelineManager = pipelineManager; + this.containerStateManager = ContainerStateManagerImpl.newBuilder() + .setConfiguration(conf) + .setPipelineManager(pipelineManager) + .setRatisServer(scmhaManager.getRatisServer()) + .setContainerStore(containerStore) + .build(); + } + + @Override + public Set<ContainerID> getContainerIDs() { + lock.readLock().lock(); + try { + return containerStateManager.getContainerIDs(); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public Set<ContainerInfo> getContainers() { + lock.readLock().lock(); + try { + return containerStateManager.getContainerIDs().stream().map(id -> { + try { + return containerStateManager.getContainer(id); + } catch (ContainerNotFoundException e) { + // How can this happen? o_O + return null; + } + }).filter(Objects::nonNull).collect(Collectors.toSet()); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public ContainerInfo getContainer(final ContainerID containerID) + throws ContainerNotFoundException { + lock.readLock().lock(); + try { + return containerStateManager.getContainer(containerID); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public Set<ContainerInfo> getContainers(final LifeCycleState state) { + lock.readLock().lock(); + try { + return containerStateManager.getContainerIDs(state).stream().map(id -> { + try { + return containerStateManager.getContainer(id); + } catch (ContainerNotFoundException e) { + // How can this happen? o_O + return null; + } + }).filter(Objects::nonNull).collect(Collectors.toSet()); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public boolean exists(final ContainerID containerID) { + lock.readLock().lock(); + try { + return (containerStateManager.getContainer(containerID) != null); + } catch (ContainerNotFoundException ex) { + return false; + } finally { + lock.readLock().unlock(); + } + } + + @Override + public List<ContainerInfo> listContainers(final ContainerID startID, + final int count) { + lock.readLock().lock(); + try { + final long startId = startID == null ? 0 : startID.getId(); + final List<ContainerID> containersIds = + new ArrayList<>(containerStateManager.getContainerIDs()); + Collections.sort(containersIds); + return containersIds.stream() + .filter(id -> id.getId() > startId) + .limit(count) + .map(id -> { + try { + return containerStateManager.getContainer(id); + } catch (ContainerNotFoundException ex) { + // This can never happen, as we hold lock no one else can remove + // the container after we got the container ids. + LOG.warn("Container Missing.", ex); + return null; + } + }).collect(Collectors.toList()); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public ContainerInfo allocateContainer(final ReplicationType type, + final ReplicationFactor replicationFactor, final String owner) + throws IOException { + lock.writeLock().lock(); + try { + final List<Pipeline> pipelines = pipelineManager + .getPipelines(type, replicationFactor, Pipeline.PipelineState.OPEN); + + if (pipelines.isEmpty()) { + throw new IOException("Could not allocate container. Cannot get any" + + " matching pipeline for Type:" + type + ", Factor:" + + replicationFactor + ", State:PipelineState.OPEN"); + } + + final ContainerID containerID = containerStateManager + .getNextContainerID(); + final Pipeline pipeline = pipelines.get( + (int) containerID.getId() % pipelines.size()); + + final ContainerInfoProto containerInfo = ContainerInfoProto.newBuilder() + .setState(LifeCycleState.OPEN) + .setPipelineID(pipeline.getId().getProtobuf()) + .setUsedBytes(0) + .setNumberOfKeys(0) + .setStateEnterTime(Time.now()) + .setOwner(owner) + .setContainerID(containerID.getId()) + .setDeleteTransactionId(0) + .setReplicationFactor(pipeline.getFactor()) + .setReplicationType(pipeline.getType()) + .build(); + containerStateManager.addContainer(containerInfo); + if (LOG.isTraceEnabled()) { + LOG.trace("New container allocated: {}", containerInfo); + } + return containerStateManager.getContainer(containerID); + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public void deleteContainer(final ContainerID containerID) + throws ContainerNotFoundException { + throw new UnsupportedOperationException("Not yet implemented!"); + } + + @Override + public void updateContainerState(final ContainerID containerID, + final LifeCycleEvent event) + throws ContainerNotFoundException { + throw new UnsupportedOperationException("Not yet implemented!"); + } + + @Override + public Set<ContainerReplica> getContainerReplicas( + final ContainerID containerID) throws ContainerNotFoundException { + throw new UnsupportedOperationException("Not yet implemented!"); + } + + @Override + public void updateContainerReplica(final ContainerID containerID, + final ContainerReplica replica) + throws ContainerNotFoundException { + throw new UnsupportedOperationException("Not yet implemented!"); + } + + @Override + public void removeContainerReplica(final ContainerID containerID, + final ContainerReplica replica) + throws ContainerNotFoundException, ContainerReplicaNotFoundException { + throw new UnsupportedOperationException("Not yet implemented!"); + } + + @Override + public void updateDeleteTransactionId( + final Map<ContainerID, Long> deleteTransactionMap) throws IOException { + throw new UnsupportedOperationException("Not yet implemented!"); + } + + @Override + public ContainerInfo getMatchingContainer(final long size, final String owner, + final Pipeline pipeline, final List<ContainerID> excludedContainerIDS) { + throw new UnsupportedOperationException("Not yet implemented!"); + } + + @Override + public void notifyContainerReportProcessing(final boolean isFullReport, + final boolean success) { + throw new UnsupportedOperationException("Not yet implemented!"); + } + + @Override + public void close() throws IOException { + throw new UnsupportedOperationException("Not yet implemented!"); + } + +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerV2.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerV2.java new file mode 100644 index 0000000..37c7b70 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerV2.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; + +/** + * TODO: Add extensive javadoc. + * + * ContainerManager class contains the mapping from a name to a pipeline + * mapping. This is used by SCM when allocating new locations and when + * looking up a key. + */ +public interface ContainerManagerV2 extends Closeable { + + + /** + * Returns all the container Ids managed by ContainerManager. + * + * @return Set of ContainerID + */ + Set<ContainerID> getContainerIDs(); + + /** + * Returns all the containers managed by ContainerManager. + * + * @return List of ContainerInfo + */ + Set<ContainerInfo> getContainers(); + + /** + * Returns all the containers which are in the specified state. + * + * @return List of ContainerInfo + */ + Set<ContainerInfo> getContainers(LifeCycleState state); + + /** + * Returns the ContainerInfo from the container ID. + * + */ + ContainerInfo getContainer(ContainerID containerID) + throws ContainerNotFoundException; + + boolean exists(ContainerID containerID); + + /** + * Returns containers under certain conditions. + * Search container IDs from start ID(exclusive), + * The max size of the searching range cannot exceed the + * value of count. + * + * @param startID start containerID, >=0, + * start searching at the head if 0. + * @param count count must be >= 0 + * Usually the count will be replace with a very big + * value instead of being unlimited in case the db is very big. + * + * @return a list of container. + */ + List<ContainerInfo> listContainers(ContainerID startID, int count); + + /** + * Allocates a new container for a given keyName and replication factor. + * + * @param replicationFactor - replication factor of the container. + * @param owner + * @return - ContainerInfo. + * @throws IOException + */ + ContainerInfo allocateContainer(ReplicationType type, + ReplicationFactor replicationFactor, + String owner) throws IOException; + + /** + * Deletes a container from SCM. + * + * @param containerID - Container ID + * @throws IOException + */ + void deleteContainer(ContainerID containerID) + throws ContainerNotFoundException; + + /** + * Update container state. + * @param containerID - Container ID + * @param event - container life cycle event + * @throws IOException + */ + void updateContainerState(ContainerID containerID, + LifeCycleEvent event) + throws ContainerNotFoundException; + + /** + * Returns the latest list of replicas for given containerId. + * + * @param containerID Container ID + * @return Set of ContainerReplica + */ + Set<ContainerReplica> getContainerReplicas(ContainerID containerID) + throws ContainerNotFoundException; + + /** + * Adds a container Replica for the given Container. + * + * @param containerID Container ID + * @param replica ContainerReplica + */ + void updateContainerReplica(ContainerID containerID, ContainerReplica replica) + throws ContainerNotFoundException; + + /** + * Remove a container Replica form a given Container. + * + * @param containerID Container ID + * @param replica ContainerReplica + * @return True of dataNode is removed successfully else false. + */ + void removeContainerReplica(ContainerID containerID, ContainerReplica replica) + throws ContainerNotFoundException, ContainerReplicaNotFoundException; + + /** + * Update deleteTransactionId according to deleteTransactionMap. + * + * @param deleteTransactionMap Maps the containerId to latest delete + * transaction id for the container. + * @throws IOException + */ + void updateDeleteTransactionId(Map<ContainerID, Long> deleteTransactionMap) + throws IOException; + + /** + * Returns ContainerInfo which matches the requirements. + * @param size - the amount of space required in the container + * @param owner - the user which requires space in its owned container + * @param pipeline - pipeline to which the container should belong + * @return ContainerInfo for the matching container. + */ + default ContainerInfo getMatchingContainer(long size, String owner, + Pipeline pipeline) { + return getMatchingContainer(size, owner, pipeline, Collections.emptyList()); + } + + /** + * Returns ContainerInfo which matches the requirements. + * @param size - the amount of space required in the container + * @param owner - the user which requires space in its owned container + * @param pipeline - pipeline to which the container should belong. + * @param excludedContainerIDS - containerIds to be excluded. + * @return ContainerInfo for the matching container. + */ + ContainerInfo getMatchingContainer(long size, String owner, + Pipeline pipeline, + List<ContainerID> excludedContainerIDS); + + /** + * Once after report processor handler completes, call this to notify + * container manager to increment metrics. + * @param isFullReport + * @param success + */ + // Is it possible to remove this from the Interface? + void notifyContainerReportProcessing(boolean isFullReport, boolean success); +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java new file mode 100644 index 0000000..16fe340 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +import java.io.IOException; + +import java.lang.reflect.Proxy; +import java.util.HashSet; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto; +import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; +import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler; +import org.apache.hadoop.hdds.scm.ha.SCMRatisServer; +import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.Table.KeyValue; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.states.ContainerState; +import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.ozone.common.statemachine.StateMachine; + +/** + * TODO: Add javadoc. + */ +public final class ContainerStateManagerImpl + implements ContainerStateManagerV2 { + + /* ********************************************************************** + * Container Life Cycle * + * * + * Event and State Transition Mapping: * + * * + * State: OPEN ----------------> CLOSING * + * Event: FINALIZE * + * * + * State: CLOSING ----------------> QUASI_CLOSED * + * Event: QUASI_CLOSE * + * * + * State: CLOSING ----------------> CLOSED * + * Event: CLOSE * + * * + * State: QUASI_CLOSED ----------------> CLOSED * + * Event: FORCE_CLOSE * + * * + * State: CLOSED ----------------> DELETING * + * Event: DELETE * + * * + * State: DELETING ----------------> DELETED * + * Event: CLEANUP * + * * + * * + * Container State Flow: * + * * + * [OPEN]--------------->[CLOSING]--------------->[QUASI_CLOSED] * + * (FINALIZE) | (QUASI_CLOSE) | * + * | | * + * | | * + * (CLOSE) | (FORCE_CLOSE) | * + * | | * + * | | * + * +--------->[CLOSED]<--------+ * + * | * + * (DELETE)| * + * | * + * | * + * [DELETING] * + * | * + * (CLEANUP) | * + * | * + * V * + * [DELETED] * + * * + ************************************************************************/ + + /** + * + */ + private static final Logger LOG = LoggerFactory.getLogger( + ContainerStateManagerImpl.class); + + /** + * + */ + private final long containerSize; + + /** + * + */ + private final AtomicLong nextContainerID; + + /** + * + */ + private final ContainerStateMap containers; + + /** + * + */ + private final PipelineManager pipelineManager; + + /** + * + */ + private Table<ContainerID, ContainerInfo> containerStore; + + /** + * + */ + private final StateMachine<LifeCycleState, LifeCycleEvent> stateMachine; + + /** + * + */ + private final ConcurrentHashMap<ContainerState, ContainerID> lastUsedMap; + + /** + * + */ + private ContainerStateManagerImpl(final Configuration conf, + final PipelineManager pipelineManager, + final Table<ContainerID, ContainerInfo> containerStore) + throws IOException { + this.pipelineManager = pipelineManager; + this.containerStore = containerStore; + this.stateMachine = newStateMachine(); + this.containerSize = getConfiguredContainerSize(conf); + this.nextContainerID = new AtomicLong(); + this.containers = new ContainerStateMap(); + this.lastUsedMap = new ConcurrentHashMap<>(); + + initialize(); + } + + /** + * + */ + private StateMachine<LifeCycleState, LifeCycleEvent> newStateMachine() { + + final Set<LifeCycleState> finalStates = new HashSet<>(); + + // These are the steady states of a container. + finalStates.add(LifeCycleState.OPEN); + finalStates.add(LifeCycleState.CLOSED); + finalStates.add(LifeCycleState.DELETED); + + final StateMachine<LifeCycleState, LifeCycleEvent> containerLifecycleSM = + new StateMachine<>(LifeCycleState.OPEN, finalStates); + + containerLifecycleSM.addTransition(LifeCycleState.OPEN, + LifeCycleState.CLOSING, + LifeCycleEvent.FINALIZE); + + containerLifecycleSM.addTransition(LifeCycleState.CLOSING, + LifeCycleState.QUASI_CLOSED, + LifeCycleEvent.QUASI_CLOSE); + + containerLifecycleSM.addTransition(LifeCycleState.CLOSING, + LifeCycleState.CLOSED, + LifeCycleEvent.CLOSE); + + containerLifecycleSM.addTransition(LifeCycleState.QUASI_CLOSED, + LifeCycleState.CLOSED, + LifeCycleEvent.FORCE_CLOSE); + + containerLifecycleSM.addTransition(LifeCycleState.CLOSED, + LifeCycleState.DELETING, + LifeCycleEvent.DELETE); + + containerLifecycleSM.addTransition(LifeCycleState.DELETING, + LifeCycleState.DELETED, + LifeCycleEvent.CLEANUP); + + return containerLifecycleSM; + } + + /** + * + */ + private long getConfiguredContainerSize(final Configuration conf) { + return (long) conf.getStorageSize( + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, + StorageUnit.BYTES); + } + + /** + * + */ + private void initialize() throws IOException { + TableIterator<ContainerID, ? extends KeyValue<ContainerID, ContainerInfo>> + iterator = containerStore.iterator(); + + while (iterator.hasNext()) { + final ContainerInfo container = iterator.next().getValue(); + Preconditions.checkNotNull(container); + containers.addContainer(container); + nextContainerID.set(Long.max(container.containerID().getId(), + nextContainerID.get())); + if (container.getState() == LifeCycleState.OPEN) { + try { + pipelineManager.addContainerToPipeline(container.getPipelineID(), + ContainerID.valueof(container.getContainerID())); + } catch (PipelineNotFoundException ex) { + LOG.warn("Found container {} which is in OPEN state with " + + "pipeline {} that does not exist. Marking container for " + + "closing.", container, container.getPipelineID()); + updateContainerState(container.containerID(), + LifeCycleEvent.FINALIZE); + } + } + } + } + + @Override + public ContainerID getNextContainerID() { + return ContainerID.valueof(nextContainerID.get()); + } + + @Override + public Set<ContainerID> getContainerIDs() { + return containers.getAllContainerIDs(); + } + + @Override + public Set<ContainerID> getContainerIDs(final LifeCycleState state) { + return containers.getContainerIDsByState(state); + } + + @Override + public ContainerInfo getContainer(final ContainerID containerID) + throws ContainerNotFoundException { + return containers.getContainerInfo(containerID); + } + + @Override + public Set<ContainerReplica> getContainerReplicas( + final ContainerID containerID) throws ContainerNotFoundException { + return containers.getContainerReplicas(containerID); + } + + @Override + public void addContainer(final ContainerInfoProto containerInfo) + throws IOException { + + // Change the exception thrown to PipelineNotFound and + // ClosedPipelineException once ClosedPipelineException is introduced + // in PipelineManager. + + Preconditions.checkNotNull(containerInfo); + final ContainerInfo container = ContainerInfo.fromProtobuf(containerInfo); + if (getContainer(container.containerID()) == null) { + Preconditions.checkArgument(nextContainerID.get() + == container.containerID().getId(), + "ContainerID mismatch."); + + pipelineManager.addContainerToPipeline( + container.getPipelineID(), container.containerID()); + containers.addContainer(container); + nextContainerID.incrementAndGet(); + } + } + + void updateContainerState(final ContainerID containerID, + final LifeCycleEvent event) + throws IOException { + throw new UnsupportedOperationException("Not yet implemented!"); + } + + + void updateContainerReplica(final ContainerID containerID, + final ContainerReplica replica) + throws ContainerNotFoundException { + containers.updateContainerReplica(containerID, replica); + } + + + void updateDeleteTransactionId( + final Map<ContainerID, Long> deleteTransactionMap) { + throw new UnsupportedOperationException("Not yet implemented!"); + } + + ContainerInfo getMatchingContainer(final long size, String owner, + PipelineID pipelineID, NavigableSet<ContainerID> containerIDs) { + throw new UnsupportedOperationException("Not yet implemented!"); + } + + + NavigableSet<ContainerID> getMatchingContainerIDs(final String owner, + final ReplicationType type, final ReplicationFactor factor, + final LifeCycleState state) { + throw new UnsupportedOperationException("Not yet implemented!"); + } + + void removeContainerReplica(final ContainerID containerID, + final ContainerReplica replica) + throws ContainerNotFoundException, ContainerReplicaNotFoundException { + throw new UnsupportedOperationException("Not yet implemented!"); + } + + + void removeContainer(final ContainerID containerID) + throws ContainerNotFoundException { + throw new UnsupportedOperationException("Not yet implemented!"); + } + + void close() throws IOException { + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder for ContainerStateManager. + */ + public static class Builder { + private Configuration conf; + private PipelineManager pipelineMgr; + private SCMRatisServer scmRatisServer; + private Table<ContainerID, ContainerInfo> table; + + public Builder setConfiguration(final Configuration config) { + conf = config; + return this; + } + + public Builder setPipelineManager(final PipelineManager pipelineManager) { + pipelineMgr = pipelineManager; + return this; + } + + public Builder setRatisServer(final SCMRatisServer ratisServer) { + scmRatisServer = ratisServer; + return this; + } + + public Builder setContainerStore( + final Table<ContainerID, ContainerInfo> containerStore) { + table = containerStore; + return this; + } + + public ContainerStateManagerV2 build() throws IOException { + Preconditions.checkNotNull(conf); + Preconditions.checkNotNull(pipelineMgr); + Preconditions.checkNotNull(scmRatisServer); + Preconditions.checkNotNull(table); + + final ContainerStateManagerV2 csm = new ContainerStateManagerImpl( + conf, pipelineMgr, table); + scmRatisServer.registerStateMachineHandler(RequestType.CONTAINER, csm); + + final SCMHAInvocationHandler invocationHandler = + new SCMHAInvocationHandler(RequestType.CONTAINER, csm, + scmRatisServer); + + return (ContainerStateManagerV2) Proxy.newProxyInstance( + SCMHAInvocationHandler.class.getClassLoader(), + new Class<?>[]{ContainerStateManagerV2.class}, invocationHandler); + } + + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java new file mode 100644 index 0000000..9960354 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.scm.metadata.Replicate; + +import java.io.IOException; +import java.util.Set; + +/** + * + * TODO: Add proper javadoc. + * + * Implementation of methods marked with {@code @Replicate} annotation should be + * + * 1. Idempotent + * 2. Arguments should be of protobuf objects + * 3. Return type should be of protobuf object + * 4. The declaration should throw RaftException + * + */ +public interface ContainerStateManagerV2 { + + /** + * + */ + ContainerID getNextContainerID(); + + /** + * + */ + Set<ContainerID> getContainerIDs(); + + /** + * + */ + Set<ContainerID> getContainerIDs(LifeCycleState state); + + /** + * + */ + ContainerInfo getContainer(ContainerID containerID) + throws ContainerNotFoundException; + + /** + * + */ + Set<ContainerReplica> getContainerReplicas(ContainerID containerID) + throws ContainerNotFoundException; + + /** + * + */ + @Replicate + void addContainer(ContainerInfoProto containerInfo) + throws IOException; + +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java new file mode 100644 index 0000000..1bc1697 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.ha; + +import com.google.common.base.Strings; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.server.ServerUtils; +import org.apache.ratis.RaftConfigKeys; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.rpc.RpcType; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.util.SizeInBytes; +import org.apache.ratis.util.TimeDuration; + +import java.io.File; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.apache.ratis.server.RaftServerConfigKeys.Log; +import static org.apache.ratis.server.RaftServerConfigKeys.RetryCache; +import static org.apache.ratis.server.RaftServerConfigKeys.Rpc; +import static org.apache.ratis.server.RaftServerConfigKeys.Snapshot; + +/** + * Ratis Util for SCM HA. + */ +public final class RatisUtil { + + private RatisUtil() { + } + + + /** + * Constructs new Raft Properties instance using {@link SCMHAConfiguration}. + * @param haConf SCMHAConfiguration + * @param conf ConfigurationSource + */ + public static RaftProperties newRaftProperties( + final SCMHAConfiguration haConf, final ConfigurationSource conf) { + //TODO: Remove ConfigurationSource! + // TODO: Check the default values. + final RaftProperties properties = new RaftProperties(); + setRaftStorageDir(properties, haConf, conf); + setRaftRpcProperties(properties, haConf); + setRaftLogProperties(properties, haConf); + setRaftRetryCacheProperties(properties, haConf); + setRaftSnapshotProperties(properties, haConf); + return properties; + } + + /** + * Set the local directory where ratis logs will be stored. + * + * @param properties RaftProperties instance which will be updated + * @param haConf SCMHAConfiguration + * @param conf ConfigurationSource + */ + public static void setRaftStorageDir(final RaftProperties properties, + final SCMHAConfiguration haConf, + final ConfigurationSource conf) { + String storageDir = haConf.getRatisStorageDir(); + if (Strings.isNullOrEmpty(storageDir)) { + storageDir = ServerUtils.getDefaultRatisDirectory(conf); + } + RaftServerConfigKeys.setStorageDir(properties, + Collections.singletonList(new File(storageDir))); + } + + /** + * Set properties related to Raft RPC. + * + * @param properties RaftProperties instance which will be updated + * @param conf SCMHAConfiguration + */ + private static void setRaftRpcProperties(final RaftProperties properties, + final SCMHAConfiguration conf) { + RaftConfigKeys.Rpc.setType(properties, + RpcType.valueOf(conf.getRatisRpcType())); + GrpcConfigKeys.Server.setPort(properties, + conf.getRatisBindAddress().getPort()); + GrpcConfigKeys.setMessageSizeMax(properties, + SizeInBytes.valueOf("32m")); + + Rpc.setRequestTimeout(properties, TimeDuration.valueOf( + conf.getRatisRequestTimeout(), TimeUnit.MILLISECONDS)); + Rpc.setTimeoutMin(properties, TimeDuration.valueOf( + conf.getRatisRequestMinTimeout(), TimeUnit.MILLISECONDS)); + Rpc.setTimeoutMax(properties, TimeDuration.valueOf( + conf.getRatisRequestMaxTimeout(), TimeUnit.MILLISECONDS)); + Rpc.setSlownessTimeout(properties, TimeDuration.valueOf( + conf.getRatisNodeFailureTimeout(), TimeUnit.MILLISECONDS)); + } + + /** + * Set properties related to Raft Log. + * + * @param properties RaftProperties instance which will be updated + * @param conf SCMHAConfiguration + */ + private static void setRaftLogProperties(final RaftProperties properties, + final SCMHAConfiguration conf) { + Log.setSegmentSizeMax(properties, + SizeInBytes.valueOf(conf.getRaftSegmentSize())); + Log.Appender.setBufferElementLimit(properties, + conf.getRaftLogAppenderQueueByteLimit()); + Log.Appender.setBufferByteLimit(properties, + SizeInBytes.valueOf(conf.getRaftLogAppenderQueueByteLimit())); + Log.setPreallocatedSize(properties, + SizeInBytes.valueOf(conf.getRaftSegmentPreAllocatedSize())); + Log.Appender.setInstallSnapshotEnabled(properties, false); + Log.setPurgeGap(properties, conf.getRaftLogPurgeGap()); + Log.setSegmentCacheNumMax(properties, 2); + } + + /** + * Set properties related to Raft Retry Cache. + * + * @param properties RaftProperties instance which will be updated + * @param conf SCMHAConfiguration + */ + private static void setRaftRetryCacheProperties( + final RaftProperties properties, final SCMHAConfiguration conf) { + RetryCache.setExpiryTime(properties, TimeDuration.valueOf( + conf.getRatisRetryCacheTimeout(), TimeUnit.MILLISECONDS)); + } + + /** + * Set properties related to Raft Snapshot. + * + * @param properties RaftProperties instance which will be updated + * @param conf SCMHAConfiguration + */ + private static void setRaftSnapshotProperties( + final RaftProperties properties, final SCMHAConfiguration conf) { + Snapshot.setAutoTriggerEnabled(properties, true); + Snapshot.setAutoTriggerThreshold(properties, 400000); + } + +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/ReflectionUtil.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/ReflectionUtil.java new file mode 100644 index 0000000..7c54723 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/ReflectionUtil.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.ha; + +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; + +/** + * Reflection util for SCM HA. + */ +public final class ReflectionUtil { + + private static Map<String, Class<?>> classCache = new HashMap<>(); + + private ReflectionUtil() { + } + + /** + * Returns the {@code Class} object associated with the given string name. + * + * @param className the fully qualified name of the desired class. + * @return the {@code Class} object for the class with the + * specified name. + * @throws ClassNotFoundException if the class cannot be located + */ + public static Class<?> getClass(String className) + throws ClassNotFoundException { + if (!classCache.containsKey(className)) { + classCache.put(className, Class.forName(className)); + } + return classCache.get(className); + } + + /** + * Returns a {@code Method} object that reflects the specified public + * member method of the given {@code Class} object. + * + * @param clazz the class object which has the method + * @param methodName the name of the method + * @param arg the list of parameters + * @return the {@code Method} object that matches the specified + * {@code name} and {@code parameterTypes} + * @throws NoSuchMethodException if a matching method is not found + * or if the name is "<init>"or "<clinit>". + */ + public static Method getMethod( + final Class<?> clazz, final String methodName, final Class<?>... arg) + throws NoSuchMethodException { + return clazz.getMethod(methodName, arg); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java new file mode 100644 index 0000000..1cb8c65 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * <p>http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * <p>Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.ha; + +import java.net.InetSocketAddress; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.conf.Config; +import org.apache.hadoop.hdds.conf.ConfigGroup; +import org.apache.hadoop.hdds.conf.ConfigType; +import org.apache.hadoop.net.NetUtils; + +import static org.apache.hadoop.hdds.conf.ConfigTag.HA; +import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE; +import static org.apache.hadoop.hdds.conf.ConfigTag.RATIS; +import static org.apache.hadoop.hdds.conf.ConfigTag.SCM; + +/** + * Configuration used by SCM HA. + */ +@ConfigGroup(prefix = "ozone.scm.ha") +public class SCMHAConfiguration { + + @Config(key = "ratis.storage.dir", + type = ConfigType.STRING, + defaultValue = "", + tags = {OZONE, SCM, HA, RATIS}, + description = "Storage directory used by SCM to write Ratis logs." + ) + private String ratisStorageDir; + + @Config(key = "ratis.bind.host", + type = ConfigType.STRING, + defaultValue = "0.0.0.0", + tags = {OZONE, SCM, HA, RATIS}, + description = "Host used by SCM for binding Ratis Server." + ) + private String ratisBindHost = "0.0.0.0"; + + @Config(key = "ratis.bind.port", + type = ConfigType.STRING, + defaultValue = "9865", + tags = {OZONE, SCM, HA, RATIS}, + description = "Port used by SCM for Ratis Server." + ) + private int ratisBindPort = 9865; + + + @Config(key = "ratis.rpc.type", + type = ConfigType.STRING, + defaultValue = "GRPC", + tags = {SCM, OZONE, HA, RATIS}, + description = "Ratis supports different kinds of transports like" + + " netty, GRPC, Hadoop RPC etc. This picks one of those for" + + " this cluster." + ) + private String ratisRpcType; + + @Config(key = "ratis.segment.size", + type = ConfigType.SIZE, + defaultValue = "16KB", + tags = {SCM, OZONE, HA, RATIS}, + description = "The size of the raft segment used by Apache Ratis on" + + " SCM. (16 KB by default)" + ) + private long raftSegmentSize = 16L * 1024L; + + @Config(key = "ratis.segment.preallocated.size", + type = ConfigType.SIZE, + defaultValue = "16KB", + tags = {SCM, OZONE, HA, RATIS}, + description = "The size of the buffer which is preallocated for" + + " raft segment used by Apache Ratis on SCM.(16 KB by default)" + ) + private long raftSegmentPreAllocatedSize = 16 * 1024; + + @Config(key = "ratis.log.appender.queue.num-elements", + type = ConfigType.INT, + defaultValue = "1024", + tags = {SCM, OZONE, HA, RATIS}, + description = "Number of operation pending with Raft's Log Worker." + ) + private int raftLogAppenderQueueNum = 1024; + + @Config(key = "ratis.log.appender.queue.byte-limit", + type = ConfigType.SIZE, + defaultValue = "32MB", + tags = {SCM, OZONE, HA, RATIS}, + description = "Byte limit for Raft's Log Worker queue." + ) + private int raftLogAppenderQueueByteLimit = 32 * 1024 * 1024; + + @Config(key = "ratis.log.purge.gap", + type = ConfigType.INT, + defaultValue = "1000000", + tags = {SCM, OZONE, HA, RATIS}, + description = "The minimum gap between log indices for Raft server to" + + " purge its log segments after taking snapshot." + ) + private int raftLogPurgeGap = 1000000; + + @Config(key = "ratis.request.timeout", + type = ConfigType.TIME, + defaultValue = "3000ms", + tags = {SCM, OZONE, HA, RATIS}, + description = "The timeout duration for SCM's Ratis server RPC." + ) + private long ratisRequestTimeout = 3000L; + + @Config(key = "ratis.server.retry.cache.timeout", + type = ConfigType.TIME, + defaultValue = "60s", + tags = {SCM, OZONE, HA, RATIS}, + description = "Retry Cache entry timeout for SCM's ratis server." + ) + private long ratisRetryCacheTimeout = 60 * 1000L; + + + @Config(key = "ratis.leader.election.timeout", + type = ConfigType.TIME, + defaultValue = "1s", + tags = {SCM, OZONE, HA, RATIS}, + description = "The minimum timeout duration for SCM ratis leader" + + " election. Default is 1s." + ) + private long ratisLeaderElectionTimeout = 1 * 1000L; + + @Config(key = "ratis.server.failure.timeout.duration", + type = ConfigType.TIME, + defaultValue = "120s", + tags = {SCM, OZONE, HA, RATIS}, + description = "The timeout duration for ratis server failure" + + " detection, once the threshold has reached, the ratis state" + + " machine will be informed about the failure in the ratis ring." + ) + private long ratisNodeFailureTimeout = 120 * 1000L; + + @Config(key = "ratis.server.role.check.interval", + type = ConfigType.TIME, + defaultValue = "15s", + tags = {SCM, OZONE, HA, RATIS}, + description = "The interval between SCM leader performing a role" + + " check on its ratis server. Ratis server informs SCM if it loses" + + " the leader role. The scheduled check is an secondary check to" + + " ensure that the leader role is updated periodically" + ) + private long ratisRoleCheckerInterval = 15 * 1000L; + + public String getRatisStorageDir() { + return ratisStorageDir; + } + + public InetSocketAddress getRatisBindAddress() { + return NetUtils.createSocketAddr(ratisBindHost, ratisBindPort); + } + + public String getRatisRpcType() { + return ratisRpcType; + } + + public long getRaftSegmentSize() { + return raftSegmentSize; + } + + public long getRaftSegmentPreAllocatedSize() { + return raftSegmentPreAllocatedSize; + } + + public int getRaftLogAppenderQueueNum() { + return raftLogAppenderQueueNum; + } + + public int getRaftLogAppenderQueueByteLimit() { + return raftLogAppenderQueueByteLimit; + } + + public int getRaftLogPurgeGap() { + return raftLogPurgeGap; + } + + public long getRatisRetryCacheTimeout() { + return ratisRetryCacheTimeout; + } + + public long getRatisRequestTimeout() { + Preconditions.checkArgument(ratisRequestTimeout > 1000L, + "Ratis request timeout cannot be less than 1000ms."); + return ratisRequestTimeout; + } + + public long getRatisRequestMinTimeout() { + return ratisRequestTimeout - 1000L; + } + + public long getRatisRequestMaxTimeout() { + return ratisRequestTimeout + 1000L; + } + + public long getRatisLeaderElectionTimeout() { + return ratisLeaderElectionTimeout; + } + + public long getRatisNodeFailureTimeout() { + return ratisNodeFailureTimeout; + } + + public long getRatisRoleCheckerInterval() { + return ratisRoleCheckerInterval; + } +} \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java new file mode 100644 index 0000000..c78c616 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * <p>http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * <p>Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.ha; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; +import org.apache.hadoop.hdds.scm.metadata.Replicate; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * InvocationHandler which checks for {@link Replicate} annotation and + * dispatches the request to Ratis Server. + */ +public class SCMHAInvocationHandler implements InvocationHandler { + + + private static final Logger LOG = LoggerFactory + .getLogger(SCMHAInvocationHandler.class); + + private final RequestType requestType; + private final Object localHandler; + private final SCMRatisServer ratisHandler; + + /** + * TODO. + */ + public SCMHAInvocationHandler(final RequestType requestType, + final Object localHandler, + final SCMRatisServer ratisHandler) { + this.requestType = requestType; + this.localHandler = localHandler; + this.ratisHandler = ratisHandler; + } + + @Override + public Object invoke(final Object proxy, final Method method, + final Object[] args) throws Throwable { + try { + long startTime = Time.monotonicNow(); + final Object result = method.isAnnotationPresent(Replicate.class) ? + invokeRatis(method, args) : invokeLocal(method, args); + LOG.debug("Call: {} took {} ms", method, Time.monotonicNow() - startTime); + return result; + } catch(InvocationTargetException iEx) { + throw iEx.getCause(); + } + } + + /** + * TODO. + */ + private Object invokeLocal(Method method, Object[] args) + throws InvocationTargetException, IllegalAccessException { + LOG.trace("Invoking method {} on target {}", method, localHandler); + return method.invoke(method, args); + } + + /** + * TODO. + */ + private Object invokeRatis(Method method, Object[] args) + throws Exception { + LOG.trace("Invoking method {} on target {}", method, ratisHandler); + final SCMRatisResponse response = ratisHandler.submitRequest( + SCMRatisRequest.of(requestType, method.getName(), args)); + if (response.isSuccess()) { + return response.getResult(); + } + // Should we unwrap and throw proper exception from here? + throw response.getException(); + } + +} \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java new file mode 100644 index 0000000..b38fc43 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * <p>http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * <p>Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.ha; + +import org.apache.hadoop.hdds.conf.ConfigurationSource; + +import java.io.IOException; + +/** + * SCMHAManager provides HA service for SCM. + * + * It uses Apache Ratis for HA implementation. We will have a 2N+1 + * node Ratis ring. The Ratis ring will have one Leader node and 2N follower + * nodes. + * + * TODO + * + */ +public class SCMHAManager { + + private static boolean isLeader = true; + + private final SCMRatisServer ratisServer; + + /** + * Creates SCMHAManager instance. + */ + public SCMHAManager(final ConfigurationSource conf) throws IOException { + this.ratisServer = new SCMRatisServer( + conf.getObject(SCMHAConfiguration.class), conf); + } + + /** + * Starts HA service. + */ + public void start() throws IOException { + ratisServer.start(); + } + + /** + * Returns true if the current SCM is the leader. + */ + public static boolean isLeader() { + return isLeader; + } + + /** + * Returns RatisServer instance associated with the SCM instance. + */ + public SCMRatisServer getRatisServer() { + return ratisServer; + } + + /** + * Stops the HA service. + */ + public void shutdown() throws IOException { + ratisServer.stop(); + } + +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisRequest.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisRequest.java new file mode 100644 index 0000000..d65c235 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisRequest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.ha; + +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; + +import com.google.common.primitives.Ints; +import com.google.protobuf.GeneratedMessage; +import com.google.protobuf.InvalidProtocolBufferException; + +import com.google.protobuf.ByteString; +import com.google.protobuf.ProtocolMessageEnum; + +import org.apache.ratis.protocol.Message; + +import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.Method; +import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.MethodArgument; +import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; +import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.SCMRatisRequestProto; + + +/** + * Represents the request that is sent to RatisServer. + */ +public final class SCMRatisRequest { + + private final RequestType type; + private final String operation; + private final Object[] arguments; + + private SCMRatisRequest(final RequestType type, final String operation, + final Object... arguments) { + this.type = type; + this.operation = operation; + this.arguments = arguments; + } + + public static SCMRatisRequest of(final RequestType type, + final String operation, + final Object... arguments) { + return new SCMRatisRequest(type, operation, arguments); + } + + /** + * Returns the type of request. + */ + public RequestType getType() { + return type; + } + + /** + * Returns the operation that this request represents. + */ + public String getOperation() { + return operation; + } + + /** + * Returns the arguments encoded in the request. + */ + public Object[] getArguments() { + return arguments.clone(); + } + + /** + * Encodes the request into Ratis Message. + */ + public Message encode() throws InvalidProtocolBufferException { + final SCMRatisRequestProto.Builder requestProtoBuilder = + SCMRatisRequestProto.newBuilder(); + requestProtoBuilder.setType(type); + + final Method.Builder methodBuilder = Method.newBuilder(); + methodBuilder.setName(operation); + + final List<MethodArgument> args = new ArrayList<>(); + for (Object argument : arguments) { + final MethodArgument.Builder argBuilder = MethodArgument.newBuilder(); + argBuilder.setType(argument.getClass().getCanonicalName()); + if (argument instanceof GeneratedMessage) { + argBuilder.setValue(((GeneratedMessage) argument).toByteString()); + } else if (argument instanceof ProtocolMessageEnum) { + argBuilder.setValue(ByteString.copyFrom(Ints.toByteArray( + ((ProtocolMessageEnum) argument).getNumber()))); + } else { + throw new InvalidProtocolBufferException(argument.getClass() + + " is not a protobuf object!"); + } + args.add(argBuilder.build()); + } + methodBuilder.addAllArgs(args); + return Message.valueOf( + org.apache.ratis.thirdparty.com.google.protobuf.ByteString.copyFrom( + requestProtoBuilder.build().toByteArray())); + } + + /** + * Decodes the request from Ratis Message. + */ + public static SCMRatisRequest decode(Message message) + throws InvalidProtocolBufferException { + final SCMRatisRequestProto requestProto = + SCMRatisRequestProto.parseFrom(message.getContent().toByteArray()); + final Method method = requestProto.getMethod(); + List<Object> args = new ArrayList<>(); + for (MethodArgument argument : method.getArgsList()) { + try { + final Class<?> clazz = ReflectionUtil.getClass(argument.getType()); + if (GeneratedMessage.class.isAssignableFrom(clazz)) { + args.add(ReflectionUtil.getMethod(clazz, "parseFrom", byte[].class) + .invoke(null, (Object) argument.getValue().toByteArray())); + } else if (Enum.class.isAssignableFrom(clazz)) { + args.add(ReflectionUtil.getMethod(clazz, "valueOf", int.class) + .invoke(null, Ints.fromByteArray( + argument.getValue().toByteArray()))); + } else { + throw new InvalidProtocolBufferException(argument.getType() + + " is not a protobuf object!"); + } + } catch (ClassNotFoundException | NoSuchMethodException | + IllegalAccessException | InvocationTargetException ex) { + throw new InvalidProtocolBufferException(argument.getType() + + " cannot be decoded!" + ex.getMessage()); + } + } + return new SCMRatisRequest(requestProto.getType(), + method.getName(), args.toArray()); + } + +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisResponse.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisResponse.java new file mode 100644 index 0000000..c4bedcc --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisResponse.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.ha; + +import java.lang.reflect.InvocationTargetException; +import java.math.BigInteger; + +import com.google.protobuf.ByteString; +import com.google.protobuf.GeneratedMessage; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.ProtocolMessageEnum; +import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.SCMRatisResponseProto; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientReply; + +/** + * Represents the response from RatisServer. + */ +public final class SCMRatisResponse { + + private final boolean success; + private final Object result; + private final Exception exception; + + private SCMRatisResponse(final Object result) { + this(true, result, null); + } + + private SCMRatisResponse(final Exception exception) { + this(false, null, exception); + } + + private SCMRatisResponse(final boolean success, final Object result, + final Exception exception) { + this.success = success; + this.result = result; + this.exception = exception; + } + + public boolean isSuccess() { + return success; + } + + public Object getResult() { + return result; + } + + public Exception getException() { + return exception; + } + + public static Message encode(final Object result) + throws InvalidProtocolBufferException { + + final ByteString value; + if (result instanceof GeneratedMessage) { + value = ((GeneratedMessage) result).toByteString(); + } else if (result instanceof ProtocolMessageEnum) { + value = ByteString.copyFrom(BigInteger.valueOf( + ((ProtocolMessageEnum) result).getNumber()).toByteArray()); + } else { + throw new InvalidProtocolBufferException(result.getClass() + + " is not a protobuf object!"); + } + + final SCMRatisResponseProto response = + SCMRatisResponseProto.newBuilder() + .setType(result.getClass().getCanonicalName()) + .setValue(value) + .build(); + return Message.valueOf( + org.apache.ratis.thirdparty.com.google.protobuf.ByteString.copyFrom( + response.toByteArray())); + } + + public static SCMRatisResponse decode(RaftClientReply reply) + throws InvalidProtocolBufferException { + return reply.isSuccess() ? + new SCMRatisResponse( + deserializeResult(reply.getMessage().getContent().toByteArray())) : + new SCMRatisResponse(reply.getException()); + } + + private static Object deserializeResult(byte[] response) + throws InvalidProtocolBufferException { + final SCMRatisResponseProto responseProto = + SCMRatisResponseProto.parseFrom(response); + try { + final Class<?> clazz = ReflectionUtil.getClass(responseProto.getType()); + if (GeneratedMessage.class.isAssignableFrom(clazz)) { + return ReflectionUtil.getMethod(clazz, "parseFrom", byte[].class) + .invoke(null, (Object) responseProto.getValue().toByteArray()); + } + + if (Enum.class.isAssignableFrom(clazz)) { + return ReflectionUtil.getMethod(clazz, "valueOf", int.class) + .invoke(null, new BigInteger( + responseProto.getValue().toByteArray()).intValue()); + } + + throw new InvalidProtocolBufferException(responseProto.getType() + + " is not a protobuf object!"); + + } catch (ClassNotFoundException | NoSuchMethodException | + IllegalAccessException | InvocationTargetException ex) { + throw new InvalidProtocolBufferException(responseProto.getType() + + " cannot be decoded!" + ex.getMessage()); + } + + } + +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java new file mode 100644 index 0000000..209535d --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.ha; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServer; + +/** + * TODO. + */ +public class SCMRatisServer { + + private final InetSocketAddress address; + private final RaftServer server; + private final RaftGroupId raftGroupId; + private final RaftGroup raftGroup; + private final RaftPeerId raftPeerId; + private final SCMStateMachine scmStateMachine; + private final ClientId clientId = ClientId.randomId(); + private final AtomicLong callId = new AtomicLong(); + + + // TODO: Refactor and remove ConfigurationSource and use only + // SCMHAConfiguration. + SCMRatisServer(final SCMHAConfiguration haConf, + final ConfigurationSource conf) + throws IOException { + final String scmServiceId = "SCM-HA-Service"; + final String scmNodeId = "localhost"; + this.raftPeerId = RaftPeerId.getRaftPeerId(scmNodeId); + this.address = haConf.getRatisBindAddress(); + final RaftPeer localRaftPeer = new RaftPeer(raftPeerId, address); + final List<RaftPeer> raftPeers = new ArrayList<>(); + raftPeers.add(localRaftPeer); + final RaftProperties serverProperties = RatisUtil + .newRaftProperties(haConf, conf); + this.raftGroupId = RaftGroupId.valueOf( + UUID.nameUUIDFromBytes(scmServiceId.getBytes(StandardCharsets.UTF_8))); + this.raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers); + this.scmStateMachine = new SCMStateMachine(); + this.server = RaftServer.newBuilder() + .setServerId(raftPeerId) + .setGroup(raftGroup) + .setProperties(serverProperties) + .setStateMachine(scmStateMachine) + .build(); + } + + void start() throws IOException { + server.start(); + } + + public void registerStateMachineHandler(final RequestType handlerType, + final Object handler) { + scmStateMachine.registerHandler(handlerType, handler); + } + + SCMRatisResponse submitRequest(SCMRatisRequest request) + throws IOException, ExecutionException, InterruptedException { + final RaftClientRequest raftClientRequest = new RaftClientRequest( + clientId, server.getId(), raftGroupId, nextCallId(), request.encode(), + RaftClientRequest.writeRequestType(), null); + final RaftClientReply raftClientReply = + server.submitClientRequestAsync(raftClientRequest).get(); + return SCMRatisResponse.decode(raftClientReply); + } + + private long nextCallId() { + return callId.getAndIncrement() & Long.MAX_VALUE; + } + + void stop() throws IOException { + server.close(); + } + +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java new file mode 100644 index 0000000..b10dd54 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * <p>http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * <p>Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.ha; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.statemachine.impl.BaseStateMachine; + +import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; + +/** + * TODO. + */ +public class SCMStateMachine extends BaseStateMachine { + + private final Map<RequestType, Object> handlers; + + public SCMStateMachine() { + this.handlers = new EnumMap<>(RequestType.class); + } + + public void registerHandler(RequestType type, Object handler) { + handlers.put(type, handler); + } + + @Override + public CompletableFuture<Message> applyTransaction( + final TransactionContext trx) { + final CompletableFuture<Message> applyTransactionFuture = + new CompletableFuture<>(); + try { + final SCMRatisRequest request = SCMRatisRequest.decode( + trx.getClientRequest().getMessage()); + applyTransactionFuture.complete(process(request)); + } catch (Exception ex) { + applyTransactionFuture.completeExceptionally(ex); + } + return applyTransactionFuture; + } + + private Message process(final SCMRatisRequest request) + throws Exception { + try { + final Object handler = handlers.get(request.getType()); + + if (handler == null) { + throw new IOException("No handler found for request type " + + request.getType()); + } + + final List<Class<?>> argumentTypes = new ArrayList<>(); + for(Object args : request.getArguments()) { + argumentTypes.add(args.getClass()); + } + final Object result = handler.getClass().getMethod( + request.getOperation(), argumentTypes.toArray(new Class<?>[0])) + .invoke(handler, request.getArguments()); + + return SCMRatisResponse.encode(result); + } catch (NoSuchMethodException | SecurityException ex) { + throw new InvalidProtocolBufferException(ex.getMessage()); + } catch (InvocationTargetException e) { + final Exception targetEx = (Exception) e.getTargetException(); + throw targetEx != null ? targetEx : e; + } + } + +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/Replicate.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/Replicate.java new file mode 100644 index 0000000..aeed57c --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/Replicate.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * <p>http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * <p>Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.metadata; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * TODO: Add javadoc. + */ +@Inherited +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +public @interface Replicate { +} diff --git a/hadoop-hdds/server-scm/src/main/proto/SCMRatisProtocol.proto b/hadoop-hdds/server-scm/src/main/proto/SCMRatisProtocol.proto new file mode 100644 index 0000000..1107016 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/proto/SCMRatisProtocol.proto @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.hdds.protocol.proto"; +option java_outer_classname = "SCMRatisProtocol"; +option java_generate_equals_and_hash = true; + +enum RequestType { + PIPELINE = 1; + CONTAINER = 2; +} + +message Method { + required string name = 1; + repeated MethodArgument args = 2; +} + +message MethodArgument { + required string type = 1; + required bytes value = 2; +} + +message SCMRatisRequestProto { + required RequestType type = 1; + required Method method = 2; +} + +message SCMRatisResponseProto { + required string type = 2; + required bytes value = 3; +} --------------------------------------------------------------------- To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org