This is an automated email from the ASF dual-hosted git repository. licheng pushed a commit to branch HDDS-2823 in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit c8367208276b4137261992f3ff395f22e7db34b0 Author: Li Cheng <bloodhell2...@gmail.com> AuthorDate: Tue Jun 2 02:18:37 2020 +0800 HDDS-3196 New PipelineManager interface to persist to RatisServer. (#980) --- .../hdds/scm/pipeline/PipelineManagerV2Impl.java | 617 +++++++++++++++++++++ .../hdds/scm/pipeline/PipelineStateManagerV2.java | 99 ++++ .../scm/pipeline/PipelineStateManagerV2Impl.java | 226 ++++++++ .../hdds/scm/pipeline/RatisPipelineUtils.java | 19 + 4 files changed, 961 insertions(+) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java new file mode 100644 index 0000000..a6a3249 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java @@ -0,0 +1,617 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.ha.SCMHAManager; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.hdds.utils.Scheduler; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.management.ObjectName; +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +/** + * SCM Pipeline Manager implementation. + * All the write operations for pipelines must come via PipelineManager. + * It synchronises all write and read operations via a ReadWriteLock. + */ +public class PipelineManagerV2Impl implements PipelineManager { + private static final Logger LOG = + LoggerFactory.getLogger(SCMPipelineManager.class); + + private final ReadWriteLock lock; + private PipelineFactory pipelineFactory; + private PipelineStateManagerV2 stateManager; + private Scheduler scheduler; + private BackgroundPipelineCreator backgroundPipelineCreator; + private final ConfigurationSource conf; + // Pipeline Manager MXBean + private ObjectName pmInfoBean; + private final SCMPipelineMetrics metrics; + private long pipelineWaitDefaultTimeout; + private final AtomicBoolean isInSafeMode; + // Used to track if the safemode pre-checks have completed. This is designed + // to prevent pipelines being created until sufficient nodes have registered. + private final AtomicBoolean pipelineCreationAllowed; + + public PipelineManagerV2Impl(ConfigurationSource conf, + NodeManager nodeManager, + PipelineStateManagerV2 pipelineStateManager, + PipelineFactory pipelineFactory) { + this.lock = new ReentrantReadWriteLock(); + this.pipelineFactory = pipelineFactory; + this.stateManager = pipelineStateManager; + this.conf = conf; + this.pmInfoBean = MBeans.register("SCMPipelineManager", + "SCMPipelineManagerInfo", this); + this.metrics = SCMPipelineMetrics.create(); + this.pipelineWaitDefaultTimeout = conf.getTimeDuration( + HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL, + HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + this.isInSafeMode = new AtomicBoolean(conf.getBoolean( + HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED, + HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED_DEFAULT)); + // Pipeline creation is only allowed after the safemode prechecks have + // passed, eg sufficient nodes have registered. + this.pipelineCreationAllowed = new AtomicBoolean(!this.isInSafeMode.get()); + } + + public static PipelineManager newPipelineManager( + ConfigurationSource conf, SCMHAManager scmhaManager, + NodeManager nodeManager, Table<PipelineID, Pipeline> pipelineStore, + PipelineFactory pipelineFactory) throws IOException { + // Create PipelineStateManager + PipelineStateManagerV2 stateManager = PipelineStateManagerV2Impl + .newBuilder().setPipelineStore(pipelineStore) + .setRatisServer(scmhaManager.getRatisServer()) + .setNodeManager(nodeManager) + .build(); + + // Create PipelineManager + PipelineManagerV2Impl pipelineManager = new PipelineManagerV2Impl(conf, + nodeManager, stateManager, pipelineFactory); + + // Create background thread. + Scheduler scheduler = new Scheduler( + "RatisPipelineUtilsThread", false, 1); + BackgroundPipelineCreator backgroundPipelineCreator = + new BackgroundPipelineCreator(pipelineManager, scheduler, conf); + pipelineManager.setBackgroundPipelineCreator(backgroundPipelineCreator); + pipelineManager.setScheduler(scheduler); + + return pipelineManager; + } + + @Override + public Pipeline createPipeline(ReplicationType type, + ReplicationFactor factor) throws IOException { + if (!isPipelineCreationAllowed() && factor != ReplicationFactor.ONE) { + LOG.debug("Pipeline creation is not allowed until safe mode prechecks " + + "complete"); + throw new IOException("Pipeline creation is not allowed as safe mode " + + "prechecks have not yet passed"); + } + lock.writeLock().lock(); + try { + Pipeline pipeline = pipelineFactory.create(type, factor); + stateManager.addPipeline(pipeline.getProtobufMessage()); + recordMetricsForPipeline(pipeline); + return pipeline; + } catch (IOException ex) { + LOG.error("Failed to create pipeline of type {} and factor {}. " + + "Exception: {}", type, factor, ex.getMessage()); + metrics.incNumPipelineCreationFailed(); + throw ex; + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public Pipeline createPipeline(ReplicationType type, ReplicationFactor factor, + List<DatanodeDetails> nodes) { + return null; + } + + @Override + public Pipeline getPipeline(PipelineID pipelineID) + throws PipelineNotFoundException { + lock.readLock().lock(); + try { + return stateManager.getPipeline(pipelineID); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public boolean containsPipeline(PipelineID pipelineID) { + lock.readLock().lock(); + try { + getPipeline(pipelineID); + return true; + } catch (PipelineNotFoundException e) { + return false; + } finally { + lock.readLock().unlock(); + } + } + + @Override + public List<Pipeline> getPipelines() { + lock.readLock().lock(); + try { + return stateManager.getPipelines(); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public List<Pipeline> getPipelines(ReplicationType type) { + try { + return stateManager.getPipelines(type); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public List<Pipeline> getPipelines(ReplicationType type, + ReplicationFactor factor) { + lock.readLock().lock(); + try { + return stateManager.getPipelines(type, factor); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public List<Pipeline> getPipelines(ReplicationType type, + Pipeline.PipelineState state) { + lock.readLock().lock(); + try { + return stateManager.getPipelines(type, state); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public List<Pipeline> getPipelines(ReplicationType type, + ReplicationFactor factor, + Pipeline.PipelineState state) { + lock.readLock().lock(); + try { + return stateManager.getPipelines(type, factor, state); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public List<Pipeline> getPipelines( + ReplicationType type, ReplicationFactor factor, + Pipeline.PipelineState state, Collection<DatanodeDetails> excludeDns, + Collection<PipelineID> excludePipelines) { + lock.readLock().lock(); + try { + return stateManager + .getPipelines(type, factor, state, excludeDns, excludePipelines); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public void addContainerToPipeline( + PipelineID pipelineID, ContainerID containerID) throws IOException { + lock.writeLock().lock(); + try { + stateManager.addContainerToPipeline(pipelineID, containerID); + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public void removeContainerFromPipeline( + PipelineID pipelineID, ContainerID containerID) throws IOException { + lock.writeLock().lock(); + try { + stateManager.removeContainerFromPipeline(pipelineID, containerID); + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public NavigableSet<ContainerID> getContainersInPipeline( + PipelineID pipelineID) throws IOException { + lock.readLock().lock(); + try { + return stateManager.getContainers(pipelineID); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public int getNumberOfContainers(PipelineID pipelineID) throws IOException { + return stateManager.getNumberOfContainers(pipelineID); + } + + @Override + public void openPipeline(PipelineID pipelineId) throws IOException { + lock.writeLock().lock(); + try { + Pipeline pipeline = stateManager.getPipeline(pipelineId); + if (pipeline.isClosed()) { + throw new IOException("Closed pipeline can not be opened"); + } + if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) { + LOG.info("Pipeline {} moved to OPEN state", pipeline); + stateManager.updatePipelineState(pipelineId.getProtobuf(), + HddsProtos.PipelineState.PIPELINE_OPEN); + } + metrics.incNumPipelineCreated(); + metrics.createPerPipelineMetrics(pipeline); + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Finalizes pipeline in the SCM. Removes pipeline and makes rpc call to + * destroy pipeline on the datanodes immediately or after timeout based on the + * value of onTimeout parameter. + * + * @param pipeline - Pipeline to be destroyed + * @param onTimeout - if true pipeline is removed and destroyed on + * datanodes after timeout + * @throws IOException + */ + @Override + public void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout) + throws IOException { + LOG.info("Destroying pipeline:{}", pipeline); + finalizePipeline(pipeline.getId()); + if (onTimeout) { + long pipelineDestroyTimeoutInMillis = + conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, + ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + scheduler.schedule(() -> destroyPipeline(pipeline), + pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS, LOG, + String.format("Destroy pipeline failed for pipeline:%s", pipeline)); + } else { + destroyPipeline(pipeline); + } + } + + /** + * Moves the pipeline to CLOSED state and sends close container command for + * all the containers in the pipeline. + * + * @param pipelineId - ID of the pipeline to be moved to CLOSED state. + * @throws IOException + */ + private void finalizePipeline(PipelineID pipelineId) throws IOException { + lock.writeLock().lock(); + try { + Pipeline pipeline = stateManager.getPipeline(pipelineId); + if (!pipeline.isClosed()) { + stateManager.updatePipelineState(pipelineId.getProtobuf(), + HddsProtos.PipelineState.PIPELINE_CLOSED); + LOG.info("Pipeline {} moved to CLOSED state", pipeline); + } + + // TODO fire events to datanodes for closing pipelines +// Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId); +// for (ContainerID containerID : containerIDs) { +// eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID); +// } + metrics.removePipelineMetrics(pipelineId); + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all + * the datanodes for ratis pipelines. + * + * @param pipeline - Pipeline to be destroyed + * @throws IOException + */ + protected void destroyPipeline(Pipeline pipeline) throws IOException { + pipelineFactory.close(pipeline.getType(), pipeline); + // remove the pipeline from the pipeline manager + removePipeline(pipeline.getId()); + triggerPipelineCreation(); + } + + /** + * Removes the pipeline from the db and pipeline state map. + * + * @param pipelineId - ID of the pipeline to be removed + * @throws IOException + */ + protected void removePipeline(PipelineID pipelineId) throws IOException { + lock.writeLock().lock(); + try { + stateManager.removePipeline(pipelineId.getProtobuf()); + metrics.incNumPipelineDestroyed(); + } catch (IOException ex) { + metrics.incNumPipelineDestroyFailed(); + throw ex; + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public void scrubPipeline(ReplicationType type, ReplicationFactor factor) + throws IOException{ + if (type != ReplicationType.RATIS || factor != ReplicationFactor.THREE) { + // Only srub pipeline for RATIS THREE pipeline + return; + } + Instant currentTime = Instant.now(); + Long pipelineScrubTimeoutInMills = conf.getTimeDuration( + ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, + ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + List<Pipeline> needToSrubPipelines = stateManager.getPipelines(type, factor, + Pipeline.PipelineState.ALLOCATED).stream() + .filter(p -> currentTime.toEpochMilli() - p.getCreationTimestamp() + .toEpochMilli() >= pipelineScrubTimeoutInMills) + .collect(Collectors.toList()); + for (Pipeline p : needToSrubPipelines) { + LOG.info("Scrubbing pipeline: id: " + p.getId().toString() + + " since it stays at ALLOCATED stage for " + + Duration.between(currentTime, p.getCreationTimestamp()).toMinutes() + + " mins."); + finalizeAndDestroyPipeline(p, false); + } + } + + /** + * Schedules a fixed interval job to create pipelines. + */ + @Override + public void startPipelineCreator() { + backgroundPipelineCreator.startFixedIntervalPipelineCreator(); + } + + /** + * Triggers pipeline creation after the specified time. + */ + @Override + public void triggerPipelineCreation() { + backgroundPipelineCreator.triggerPipelineCreation(); + } + + @Override + public void incNumBlocksAllocatedMetric(PipelineID id) { + metrics.incNumBlocksAllocated(id); + } + + /** + * Activates a dormant pipeline. + * + * @param pipelineID ID of the pipeline to activate. + * @throws IOException in case of any Exception + */ + @Override + public void activatePipeline(PipelineID pipelineID) + throws IOException { + stateManager.updatePipelineState(pipelineID.getProtobuf(), + HddsProtos.PipelineState.PIPELINE_OPEN); + } + + /** + * Deactivates an active pipeline. + * + * @param pipelineID ID of the pipeline to deactivate. + * @throws IOException in case of any Exception + */ + @Override + public void deactivatePipeline(PipelineID pipelineID) + throws IOException { + stateManager.updatePipelineState(pipelineID.getProtobuf(), + HddsProtos.PipelineState.PIPELINE_DORMANT); + } + + /** + * Wait a pipeline to be OPEN. + * + * @param pipelineID ID of the pipeline to wait for. + * @param timeout wait timeout, millisecond, 0 to use default value + * @throws IOException in case of any Exception, such as timeout + */ + @Override + public void waitPipelineReady(PipelineID pipelineID, long timeout) + throws IOException { + long st = Time.monotonicNow(); + if (timeout == 0) { + timeout = pipelineWaitDefaultTimeout; + } + + boolean ready; + Pipeline pipeline; + do { + try { + pipeline = stateManager.getPipeline(pipelineID); + } catch (PipelineNotFoundException e) { + throw new PipelineNotFoundException(String.format( + "Pipeline %s cannot be found", pipelineID)); + } + ready = pipeline.isOpen(); + if (!ready) { + try { + Thread.sleep((long)100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } while (!ready && Time.monotonicNow() - st < timeout); + + if (!ready) { + throw new IOException(String.format("Pipeline %s is not ready in %d ms", + pipelineID, timeout)); + } + } + + @Override + public Map<String, Integer> getPipelineInfo() { + final Map<String, Integer> pipelineInfo = new HashMap<>(); + for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) { + pipelineInfo.put(state.toString(), 0); + } + stateManager.getPipelines().forEach(pipeline -> + pipelineInfo.computeIfPresent( + pipeline.getPipelineState().toString(), (k, v) -> v + 1)); + return pipelineInfo; + } + + /** + * Get SafeMode status. + * @return boolean + */ + @Override + public boolean getSafeModeStatus() { + return this.isInSafeMode.get(); + } + + @Override + public void close() throws IOException { + if (scheduler != null) { + scheduler.close(); + scheduler = null; + } + + if(pmInfoBean != null) { + MBeans.unregister(this.pmInfoBean); + pmInfoBean = null; + } + + SCMPipelineMetrics.unRegister(); + + // shutdown pipeline provider. + pipelineFactory.shutdown(); + } + + @Override + public void onMessage(SCMSafeModeManager.SafeModeStatus status, + EventPublisher publisher) { + // TODO: #CLUTIL - handle safemode getting re-enabled + boolean currentAllowPipelines = + pipelineCreationAllowed.getAndSet(status.isPreCheckComplete()); + boolean currentlyInSafeMode = + isInSafeMode.getAndSet(status.isInSafeMode()); + + // Trigger pipeline creation only if the preCheck status has changed to + // complete. + if (isPipelineCreationAllowed() && !currentAllowPipelines) { + triggerPipelineCreation(); + } + // Start the pipeline creation thread only when safemode switches off + if (!getSafeModeStatus() && currentlyInSafeMode) { + startPipelineCreator(); + } + } + + @VisibleForTesting + public boolean isPipelineCreationAllowed() { + return pipelineCreationAllowed.get(); + } + + private void setBackgroundPipelineCreator( + BackgroundPipelineCreator backgroundPipelineCreator) { + this.backgroundPipelineCreator = backgroundPipelineCreator; + } + + private void setScheduler(Scheduler scheduler) { + this.scheduler = scheduler; + } + + private void recordMetricsForPipeline(Pipeline pipeline) { + metrics.incNumPipelineAllocated(); + if (pipeline.isOpen()) { + metrics.incNumPipelineCreated(); + metrics.createPerPipelineMetrics(pipeline); + } + switch (pipeline.getType()) { + case STAND_ALONE: + return; + case RATIS: + List<Pipeline> overlapPipelines = RatisPipelineUtils + .checkPipelineContainSameDatanodes(stateManager, pipeline); + if (!overlapPipelines.isEmpty()) { + // Count 1 overlap at a time. + metrics.incNumPipelineContainSameDatanodes(); + //TODO remove until pipeline allocation is proved equally distributed. + for (Pipeline overlapPipeline : overlapPipelines) { + LOG.info("Pipeline: " + pipeline.getId().toString() + + " contains same datanodes as previous pipelines: " + + overlapPipeline.getId().toString() + " nodeIds: " + + pipeline.getNodes().get(0).getUuid().toString() + + ", " + pipeline.getNodes().get(1).getUuid().toString() + + ", " + pipeline.getNodes().get(2).getUuid().toString()); + } + } + return; + case CHAINED: + // Not supported. + default: + // Not supported. + return; + } + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2.java new file mode 100644 index 0000000..4021575 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * <p>http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * <p>Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.metadata.Replicate; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.NavigableSet; + +/** + * Manages the state of pipelines in SCM. + */ +public interface PipelineStateManagerV2 { + + /** + * Adding pipeline would be replicated to Ratis. + * @param pipelineProto + * @throws IOException + */ + @Replicate + void addPipeline(HddsProtos.Pipeline pipelineProto) throws IOException; + + /** + * Removing pipeline would be replicated to Ratis. + * @param pipelineIDProto + * @return Pipeline removed + * @throws IOException + */ + @Replicate + void removePipeline(HddsProtos.PipelineID pipelineIDProto) + throws IOException; + + /** + * Updating pipeline state would be replicated to Ratis. + * @param pipelineIDProto + * @param newState + * @throws IOException + */ + @Replicate + void updatePipelineState(HddsProtos.PipelineID pipelineIDProto, + HddsProtos.PipelineState newState) + throws IOException; + + void addContainerToPipeline(PipelineID pipelineID, + ContainerID containerID) throws IOException; + + Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException; + + List<Pipeline> getPipelines(); + + List<Pipeline> getPipelines(HddsProtos.ReplicationType type); + + List<Pipeline> getPipelines(HddsProtos.ReplicationType type, + HddsProtos.ReplicationFactor factor); + + List<Pipeline> getPipelines(HddsProtos.ReplicationType type, + HddsProtos.ReplicationFactor factor, + Pipeline.PipelineState state); + + List<Pipeline> getPipelines(HddsProtos.ReplicationType type, + HddsProtos.ReplicationFactor factor, + Pipeline.PipelineState state, + Collection<DatanodeDetails> excludeDns, + Collection<PipelineID> excludePipelines); + + List<Pipeline> getPipelines(HddsProtos.ReplicationType type, + Pipeline.PipelineState... states); + + NavigableSet<ContainerID> getContainers(PipelineID pipelineID) + throws IOException; + + int getNumberOfContainers(PipelineID pipelineID) throws IOException; + + + void removeContainerFromPipeline(PipelineID pipelineID, + ContainerID containerID) throws IOException; + + void close() throws Exception; +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java new file mode 100644 index 0000000..c74dc86 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * <p>http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * <p>Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler; +import org.apache.hadoop.hdds.scm.ha.SCMRatisServer; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Proxy; +import java.util.Collection; +import java.util.List; +import java.util.NavigableSet; + +/** + * Implementation of pipeline state manager. + * PipelineStateMap class holds the data structures related to pipeline and its + * state. All the read and write operations in PipelineStateMap are protected + * by a read write lock. + */ +public class PipelineStateManagerV2Impl implements PipelineStateManagerV2 { + + private static final Logger LOG = + LoggerFactory.getLogger(PipelineStateManager.class); + + private final PipelineStateMap pipelineStateMap; + private final NodeManager nodeManager; + private Table<PipelineID, Pipeline> pipelineStore; + + public PipelineStateManagerV2Impl( + Table<PipelineID, Pipeline> pipelineStore, NodeManager nodeManager) + throws IOException { + this.pipelineStateMap = new PipelineStateMap(); + this.nodeManager = nodeManager; + this.pipelineStore = pipelineStore; + initialize(); + } + + private void initialize() throws IOException { + if (pipelineStore == null || nodeManager == null) { + throw new IOException("PipelineStore cannot be null"); + } + if (pipelineStore.isEmpty()) { + LOG.info("No pipeline exists in current db"); + return; + } + TableIterator<PipelineID, ? extends Table.KeyValue<PipelineID, Pipeline>> + iterator = pipelineStore.iterator(); + while (iterator.hasNext()) { + Pipeline pipeline = iterator.next().getValue(); + addPipeline(pipeline.getProtobufMessage()); + } + } + + @Override + public void addPipeline(HddsProtos.Pipeline pipelineProto) + throws IOException { + Pipeline pipeline = Pipeline.getFromProtobuf(pipelineProto); + pipelineStore.put(pipeline.getId(), pipeline); + pipelineStateMap.addPipeline(pipeline); + nodeManager.addPipeline(pipeline); + LOG.info("Created pipeline {}.", pipeline); + } + + @Override + public void addContainerToPipeline( + PipelineID pipelineId, ContainerID containerID) + throws IOException { + pipelineStateMap.addContainerToPipeline(pipelineId, containerID); + } + + @Override + public Pipeline getPipeline(PipelineID pipelineID) + throws PipelineNotFoundException { + return pipelineStateMap.getPipeline(pipelineID); + } + + @Override + public List<Pipeline> getPipelines() { + return pipelineStateMap.getPipelines(); + } + + @Override + public List<Pipeline> getPipelines(HddsProtos.ReplicationType type) { + return pipelineStateMap.getPipelines(type); + } + + @Override + public List<Pipeline> getPipelines( + HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor) { + return pipelineStateMap.getPipelines(type, factor); + } + + @Override + public List<Pipeline> getPipelines( + HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor, + Pipeline.PipelineState state) { + return pipelineStateMap.getPipelines(type, factor, state); + } + + @Override + public List<Pipeline> getPipelines( + HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor, + Pipeline.PipelineState state, Collection<DatanodeDetails> excludeDns, + Collection<PipelineID> excludePipelines) { + return pipelineStateMap + .getPipelines(type, factor, state, excludeDns, excludePipelines); + } + + @Override + public List<Pipeline> getPipelines(HddsProtos.ReplicationType type, + Pipeline.PipelineState... states) { + return pipelineStateMap.getPipelines(type, states); + } + + @Override + public NavigableSet<ContainerID> getContainers(PipelineID pipelineID) + throws IOException { + return pipelineStateMap.getContainers(pipelineID); + } + + @Override + public int getNumberOfContainers(PipelineID pipelineID) throws IOException { + return pipelineStateMap.getNumberOfContainers(pipelineID); + } + + @Override + public void removePipeline(HddsProtos.PipelineID pipelineIDProto) + throws IOException { + PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineIDProto); + pipelineStore.delete(pipelineID); + Pipeline pipeline = pipelineStateMap.removePipeline(pipelineID); + nodeManager.removePipeline(pipeline); + LOG.info("Pipeline {} removed.", pipeline); + return; + } + + + @Override + public void removeContainerFromPipeline( + PipelineID pipelineID, ContainerID containerID) throws IOException { + pipelineStateMap.removeContainerFromPipeline(pipelineID, containerID); + } + + @Override + public void updatePipelineState( + HddsProtos.PipelineID pipelineIDProto, HddsProtos.PipelineState newState) + throws IOException { + pipelineStateMap.updatePipelineState( + PipelineID.getFromProtobuf(pipelineIDProto), + Pipeline.PipelineState.fromProtobuf(newState)); + } + + @Override + public void close() throws Exception { + pipelineStore.close(); + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder for PipelineStateManager. + */ + public static class Builder { + private Table<PipelineID, Pipeline> pipelineStore; + private NodeManager nodeManager; + private SCMRatisServer scmRatisServer; + + public Builder setRatisServer(final SCMRatisServer ratisServer) { + scmRatisServer = ratisServer; + return this; + } + + public Builder setNodeManager(final NodeManager scmNodeManager) { + nodeManager = scmNodeManager; + return this; + } + + public Builder setPipelineStore( + final Table<PipelineID, Pipeline> pipelineTable) { + this.pipelineStore = pipelineTable; + return this; + } + + public PipelineStateManagerV2 build() throws IOException { + Preconditions.checkNotNull(pipelineStore); + + final PipelineStateManagerV2 pipelineStateManager = + new PipelineStateManagerV2Impl(pipelineStore, nodeManager); + + final SCMHAInvocationHandler invocationHandler = + new SCMHAInvocationHandler(SCMRatisProtocol.RequestType.PIPELINE, + pipelineStateManager, scmRatisServer); + + return (PipelineStateManagerV2) Proxy.newProxyInstance( + SCMHAInvocationHandler.class.getClassLoader(), + new Class<?>[]{PipelineStateManagerV2.class}, invocationHandler); + } + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java index 5c9b202..edc40af 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java @@ -117,4 +117,23 @@ public final class RatisPipelineUtils { p.sameDatanodes(pipeline))) .collect(Collectors.toList()); } + + /** + * Return the list of pipelines who share the same set of datanodes + * with the input pipeline. + * + * @param stateManager PipelineStateManager + * @param pipeline input pipeline + * @return list of matched pipeline + */ + static List<Pipeline> checkPipelineContainSameDatanodes( + PipelineStateManagerV2 stateManager, Pipeline pipeline) { + return stateManager.getPipelines( + HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE) + .stream().filter(p -> !p.getId().equals(pipeline.getId()) && + (p.getPipelineState() != Pipeline.PipelineState.CLOSED && + p.sameDatanodes(pipeline))) + .collect(Collectors.toList()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org