http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java deleted file mode 100644 index 91fa9c3..0000000 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ /dev/null @@ -1,384 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdsl.conf.OzoneConfiguration; -import org.apache.hadoop.hdsl.protocol.DatanodeDetails; -import org.apache.hadoop.ozone.container.common.statemachine.commandhandler - .CloseContainerHandler; -import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CommandDispatcher; -import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ContainerReportHandler; -import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.util.concurrent.HadoopExecutors; - -import static org.apache.hadoop.ozone.scm.HdslServerUtil.getScmHeartbeatInterval; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.io.IOException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -/** - * State Machine Class. - */ -public class DatanodeStateMachine implements Closeable { - @VisibleForTesting - static final Logger LOG = - LoggerFactory.getLogger(DatanodeStateMachine.class); - private final ExecutorService executorService; - private final Configuration conf; - private final SCMConnectionManager connectionManager; - private final long heartbeatFrequency; - private StateContext context; - private final OzoneContainer container; - private DatanodeDetails datanodeDetails; - private final CommandDispatcher commandDispatcher; - private long commandsHandled; - private AtomicLong nextHB; - private Thread stateMachineThread = null; - private Thread cmdProcessThread = null; - - /** - * Constructs a a datanode state machine. - * - * @param datanodeDetails - DatanodeDetails used to identify a datanode - * @param conf - Configuration. - */ - public DatanodeStateMachine(DatanodeDetails datanodeDetails, - Configuration conf) throws IOException { - this.conf = conf; - this.datanodeDetails = datanodeDetails; - executorService = HadoopExecutors.newCachedThreadPool( - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Datanode State Machine Thread - %d").build()); - connectionManager = new SCMConnectionManager(conf); - context = new StateContext(this.conf, DatanodeStates.getInitState(), this); - heartbeatFrequency = TimeUnit.SECONDS.toMillis( - getScmHeartbeatInterval(conf)); - container = new OzoneContainer(this.datanodeDetails, - new OzoneConfiguration(conf)); - nextHB = new AtomicLong(Time.monotonicNow()); - - // When we add new handlers just adding a new handler here should do the - // trick. - commandDispatcher = CommandDispatcher.newBuilder() - .addHandler(new ContainerReportHandler()) - .addHandler(new CloseContainerHandler()) - .addHandler(new DeleteBlocksCommandHandler( - container.getContainerManager(), conf)) - .setConnectionManager(connectionManager) - .setContainer(container) - .setContext(context) - .build(); - } - - /** - * - * Return DatanodeDetails if set, return null otherwise. - * - * @return DatanodeDetails - */ - public DatanodeDetails getDatanodeDetails() { - return datanodeDetails; - } - - - /** - * Returns the Connection manager for this state machine. - * - * @return - SCMConnectionManager. - */ - public SCMConnectionManager getConnectionManager() { - return connectionManager; - } - - public OzoneContainer getContainer() { - return this.container; - } - - /** - * Runs the state machine at a fixed frequency. - */ - private void start() throws IOException { - long now = 0; - - container.start(); - initCommandHandlerThread(conf); - while (context.getState() != DatanodeStates.SHUTDOWN) { - try { - LOG.debug("Executing cycle Number : {}", context.getExecutionCount()); - nextHB.set(Time.monotonicNow() + heartbeatFrequency); - context.setReportState(container.getNodeReport()); - context.setContainerReportState(container.getContainerReportState()); - context.execute(executorService, heartbeatFrequency, - TimeUnit.MILLISECONDS); - now = Time.monotonicNow(); - if (now < nextHB.get()) { - Thread.sleep(nextHB.get() - now); - } - } catch (InterruptedException e) { - // Ignore this exception. - } catch (Exception e) { - LOG.error("Unable to finish the execution.", e); - } - } - } - - /** - * Gets the current context. - * - * @return StateContext - */ - public StateContext getContext() { - return context; - } - - /** - * Sets the current context. - * - * @param context - Context - */ - public void setContext(StateContext context) { - this.context = context; - } - - /** - * Closes this stream and releases any system resources associated with it. If - * the stream is already closed then invoking this method has no effect. - * <p> - * <p> As noted in {@link AutoCloseable#close()}, cases where the close may - * fail require careful attention. It is strongly advised to relinquish the - * underlying resources and to internally <em>mark</em> the {@code Closeable} - * as closed, prior to throwing the {@code IOException}. - * - * @throws IOException if an I/O error occurs - */ - @Override - public void close() throws IOException { - if (stateMachineThread != null) { - stateMachineThread.interrupt(); - } - if (cmdProcessThread != null) { - cmdProcessThread.interrupt(); - } - context.setState(DatanodeStates.getLastState()); - executorService.shutdown(); - try { - if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { - executorService.shutdownNow(); - } - - if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { - LOG.error("Unable to shutdown state machine properly."); - } - } catch (InterruptedException e) { - LOG.error("Error attempting to shutdown.", e); - executorService.shutdownNow(); - Thread.currentThread().interrupt(); - } - - if (connectionManager != null) { - connectionManager.close(); - } - - if(container != null) { - container.stop(); - } - } - - /** - * States that a datanode can be in. GetNextState will move this enum from - * getInitState to getLastState. - */ - public enum DatanodeStates { - INIT(1), - RUNNING(2), - SHUTDOWN(3); - private final int value; - - /** - * Constructs ContainerStates. - * - * @param value Enum Value - */ - DatanodeStates(int value) { - this.value = value; - } - - /** - * Returns the first State. - * - * @return First State. - */ - public static DatanodeStates getInitState() { - return INIT; - } - - /** - * The last state of endpoint states. - * - * @return last state. - */ - public static DatanodeStates getLastState() { - return SHUTDOWN; - } - - /** - * returns the numeric value associated with the endPoint. - * - * @return int. - */ - public int getValue() { - return value; - } - - /** - * Returns the next logical state that endPoint should move to. This - * function assumes the States are sequentially numbered. - * - * @return NextState. - */ - public DatanodeStates getNextState() { - if (this.value < getLastState().getValue()) { - int stateValue = this.getValue() + 1; - for (DatanodeStates iter : values()) { - if (stateValue == iter.getValue()) { - return iter; - } - } - } - return getLastState(); - } - } - - /** - * Start datanode state machine as a single thread daemon. - */ - public void startDaemon() { - Runnable startStateMachineTask = () -> { - try { - start(); - LOG.info("Ozone container server started."); - } catch (Exception ex) { - LOG.error("Unable to start the DatanodeState Machine", ex); - } - }; - stateMachineThread = new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("Datanode State Machine Thread - %d") - .build().newThread(startStateMachineTask); - stateMachineThread.start(); - } - - /** - * Stop the daemon thread of the datanode state machine. - */ - public synchronized void stopDaemon() { - try { - context.setState(DatanodeStates.SHUTDOWN); - this.close(); - LOG.info("Ozone container server stopped."); - } catch (IOException e) { - LOG.error("Stop ozone container server failed.", e); - } - } - - /** - * - * Check if the datanode state machine daemon is stopped. - * - * @return True if datanode state machine daemon is stopped - * and false otherwise. - */ - @VisibleForTesting - public boolean isDaemonStopped() { - return this.executorService.isShutdown() - && this.getContext().getExecutionCount() == 0 - && this.getContext().getState() == DatanodeStates.SHUTDOWN; - } - - /** - * Create a command handler thread. - * - * @param config - */ - private void initCommandHandlerThread(Configuration config) { - - /** - * Task that periodically checks if we have any outstanding commands. - * It is assumed that commands can be processed slowly and in order. - * This assumption might change in future. Right now due to this assumption - * we have single command queue process thread. - */ - Runnable processCommandQueue = () -> { - long now; - while (getContext().getState() != DatanodeStates.SHUTDOWN) { - SCMCommand command = getContext().getNextCommand(); - if (command != null) { - commandDispatcher.handle(command); - commandsHandled++; - } else { - try { - // Sleep till the next HB + 1 second. - now = Time.monotonicNow(); - if (nextHB.get() > now) { - Thread.sleep((nextHB.get() - now) + 1000L); - } - } catch (InterruptedException e) { - // Ignore this exception. - } - } - } - }; - - // We will have only one thread for command processing in a datanode. - cmdProcessThread = getCommandHandlerThread(processCommandQueue); - cmdProcessThread.start(); - } - - private Thread getCommandHandlerThread(Runnable processCommandQueue) { - Thread handlerThread = new Thread(processCommandQueue); - handlerThread.setDaemon(true); - handlerThread.setName("Command processor thread"); - handlerThread.setUncaughtExceptionHandler((Thread t, Throwable e) -> { - // Let us just restart this thread after logging a critical error. - // if this thread is not running we cannot handle commands from SCM. - LOG.error("Critical Error : Command processor thread encountered an " + - "error. Thread: {}", t.toString(), e); - getCommandHandlerThread(processCommandQueue).start(); - }); - return handlerThread; - } - - /** - * Returns the number of commands handled by the datanode. - * @return count - */ - @VisibleForTesting - public long getCommandHandled() { - return commandsHandled; - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java deleted file mode 100644 index 61bc91e..0000000 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java +++ /dev/null @@ -1,295 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ozone.protocol.VersionResponse; -import org.apache.hadoop.ozone.protocolPB - .StorageContainerDatanodeProtocolClientSideTranslatorPB; - -import static org.apache.hadoop.ozone.scm.HdslServerUtil.getLogWarnInterval; -import static org.apache.hadoop.ozone.scm.HdslServerUtil - .getScmHeartbeatInterval; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.time.ZonedDateTime; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -/** - * Endpoint is used as holder class that keeps state around the RPC endpoint. - */ -public class EndpointStateMachine - implements Closeable, EndpointStateMachineMBean { - static final Logger - LOG = LoggerFactory.getLogger(EndpointStateMachine.class); - private final StorageContainerDatanodeProtocolClientSideTranslatorPB endPoint; - private final AtomicLong missedCount; - private final InetSocketAddress address; - private final Lock lock; - private final Configuration conf; - private EndPointStates state; - private VersionResponse version; - private ZonedDateTime lastSuccessfulHeartbeat; - - /** - * Constructs RPC Endpoints. - * - * @param endPoint - RPC endPoint. - */ - public EndpointStateMachine(InetSocketAddress address, - StorageContainerDatanodeProtocolClientSideTranslatorPB endPoint, - Configuration conf) { - this.endPoint = endPoint; - this.missedCount = new AtomicLong(0); - this.address = address; - state = EndPointStates.getInitState(); - lock = new ReentrantLock(); - this.conf = conf; - } - - /** - * Takes a lock on this EndPoint so that other threads don't use this while we - * are trying to communicate via this endpoint. - */ - public void lock() { - lock.lock(); - } - - /** - * Unlocks this endpoint. - */ - public void unlock() { - lock.unlock(); - } - - /** - * Returns the version that we read from the server if anyone asks . - * - * @return - Version Response. - */ - public VersionResponse getVersion() { - return version; - } - - /** - * Sets the Version reponse we recieved from the SCM. - * - * @param version VersionResponse - */ - public void setVersion(VersionResponse version) { - this.version = version; - } - - /** - * Returns the current State this end point is in. - * - * @return - getState. - */ - public EndPointStates getState() { - return state; - } - - @Override - public int getVersionNumber() { - if (version != null) { - return version.getProtobufMessage().getSoftwareVersion(); - } else { - return -1; - } - } - - /** - * Sets the endpoint state. - * - * @param epState - end point state. - */ - public EndPointStates setState(EndPointStates epState) { - this.state = epState; - return this.state; - } - - /** - * Closes the connection. - * - * @throws IOException - */ - @Override - public void close() throws IOException { - if (endPoint != null) { - endPoint.close(); - } - } - - /** - * We maintain a count of how many times we missed communicating with a - * specific SCM. This is not made atomic since the access to this is always - * guarded by the read or write lock. That is, it is serialized. - */ - public void incMissed() { - this.missedCount.incrementAndGet(); - } - - /** - * Returns the value of the missed count. - * - * @return int - */ - public long getMissedCount() { - return this.missedCount.get(); - } - - @Override - public String getAddressString() { - return getAddress().toString(); - } - - public void zeroMissedCount() { - this.missedCount.set(0); - } - - /** - * Returns the InetAddress of the endPoint. - * - * @return - EndPoint. - */ - public InetSocketAddress getAddress() { - return this.address; - } - - /** - * Returns real RPC endPoint. - * - * @return rpc client. - */ - public StorageContainerDatanodeProtocolClientSideTranslatorPB - getEndPoint() { - return endPoint; - } - - /** - * Returns the string that represents this endpoint. - * - * @return - String - */ - public String toString() { - return address.toString(); - } - - /** - * Logs exception if needed. - * @param ex - Exception - */ - public void logIfNeeded(Exception ex) { - LOG.trace("Incrementing the Missed count. Ex : {}", ex); - this.incMissed(); - if (this.getMissedCount() % getLogWarnInterval(conf) == - 0) { - LOG.warn("Unable to communicate to SCM server at {}. We have not been " + - "able to communicate to this SCM server for past {} seconds.", - this.getAddress().getHostString() + ":" + this.getAddress().getPort(), - this.getMissedCount() * getScmHeartbeatInterval( - this.conf)); - } - } - - - /** - * States that an Endpoint can be in. - * <p> - * This is a sorted list of states that EndPoint will traverse. - * <p> - * GetNextState will move this enum from getInitState to getLastState. - */ - public enum EndPointStates { - GETVERSION(1), - REGISTER(2), - HEARTBEAT(3), - SHUTDOWN(4); // if you add value after this please edit getLastState too. - private final int value; - - /** - * Constructs endPointStates. - * - * @param value state. - */ - EndPointStates(int value) { - this.value = value; - } - - /** - * Returns the first State. - * - * @return First State. - */ - public static EndPointStates getInitState() { - return GETVERSION; - } - - /** - * The last state of endpoint states. - * - * @return last state. - */ - public static EndPointStates getLastState() { - return SHUTDOWN; - } - - /** - * returns the numeric value associated with the endPoint. - * - * @return int. - */ - public int getValue() { - return value; - } - - /** - * Returns the next logical state that endPoint should move to. - * The next state is computed by adding 1 to the current state. - * - * @return NextState. - */ - public EndPointStates getNextState() { - if (this.getValue() < getLastState().getValue()) { - int stateValue = this.getValue() + 1; - for (EndPointStates iter : values()) { - if (stateValue == iter.getValue()) { - return iter; - } - } - } - return getLastState(); - } - } - - public long getLastSuccessfulHeartbeat() { - return lastSuccessfulHeartbeat == null ? - 0 : - lastSuccessfulHeartbeat.toEpochSecond(); - } - - public void setLastSuccessfulHeartbeat( - ZonedDateTime lastSuccessfulHeartbeat) { - this.lastSuccessfulHeartbeat = lastSuccessfulHeartbeat; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachineMBean.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachineMBean.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachineMBean.java deleted file mode 100644 index 4f64bde..0000000 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachineMBean.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine; - - -/** - * JMX representation of an EndpointStateMachine. - */ -public interface EndpointStateMachineMBean { - - long getMissedCount(); - - String getAddressString(); - - EndpointStateMachine.EndPointStates getState(); - - int getVersionNumber(); - - long getLastSuccessfulHeartbeat(); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java deleted file mode 100644 index c9f83c6..0000000 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java +++ /dev/null @@ -1,203 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.metrics2.util.MBeans; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.ozone.protocolPB - .StorageContainerDatanodeProtocolClientSideTranslatorPB; -import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB; -import org.apache.hadoop.security.UserGroupInformation; - -import static org.apache.hadoop.ozone.scm.HdslServerUtil - .getScmRpcTimeOutInMilliseconds; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.management.ObjectName; -import java.io.Closeable; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.*; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -/** - * SCMConnectionManager - Acts as a class that manages the membership - * information of the SCMs that we are working with. - */ -public class SCMConnectionManager - implements Closeable, SCMConnectionManagerMXBean { - private static final Logger LOG = - LoggerFactory.getLogger(SCMConnectionManager.class); - - private final ReadWriteLock mapLock; - private final Map<InetSocketAddress, EndpointStateMachine> scmMachines; - - private final int rpcTimeout; - private final Configuration conf; - private final ObjectName jmxBean; - - public SCMConnectionManager(Configuration conf) { - this.mapLock = new ReentrantReadWriteLock(); - Long timeOut = getScmRpcTimeOutInMilliseconds(conf); - this.rpcTimeout = timeOut.intValue(); - this.scmMachines = new HashMap<>(); - this.conf = conf; - jmxBean = MBeans.register("OzoneDataNode", - "SCMConnectionManager", - this); - } - - - /** - * Returns Config. - * - * @return ozoneConfig. - */ - public Configuration getConf() { - return conf; - } - - /** - * Get RpcTimeout. - * - * @return - Return RPC timeout. - */ - public int getRpcTimeout() { - return rpcTimeout; - } - - - /** - * Takes a read lock. - */ - public void readLock() { - this.mapLock.readLock().lock(); - } - - /** - * Releases the read lock. - */ - public void readUnlock() { - this.mapLock.readLock().unlock(); - } - - /** - * Takes the write lock. - */ - public void writeLock() { - this.mapLock.writeLock().lock(); - } - - /** - * Releases the write lock. - */ - public void writeUnlock() { - this.mapLock.writeLock().unlock(); - } - - /** - * adds a new SCM machine to the target set. - * - * @param address - Address of the SCM machine to send heatbeat to. - * @throws IOException - */ - public void addSCMServer(InetSocketAddress address) throws IOException { - writeLock(); - try { - if (scmMachines.containsKey(address)) { - LOG.warn("Trying to add an existing SCM Machine to Machines group. " + - "Ignoring the request."); - return; - } - RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class, - ProtobufRpcEngine.class); - long version = - RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class); - - StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProxy( - StorageContainerDatanodeProtocolPB.class, version, - address, UserGroupInformation.getCurrentUser(), conf, - NetUtils.getDefaultSocketFactory(conf), getRpcTimeout()); - - StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient = - new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy); - - EndpointStateMachine endPoint = - new EndpointStateMachine(address, rpcClient, conf); - scmMachines.put(address, endPoint); - } finally { - writeUnlock(); - } - } - - /** - * Removes a SCM machine for the target set. - * - * @param address - Address of the SCM machine to send heatbeat to. - * @throws IOException - */ - public void removeSCMServer(InetSocketAddress address) throws IOException { - writeLock(); - try { - if (!scmMachines.containsKey(address)) { - LOG.warn("Trying to remove a non-existent SCM machine. " + - "Ignoring the request."); - return; - } - - EndpointStateMachine endPoint = scmMachines.get(address); - endPoint.close(); - scmMachines.remove(address); - } finally { - writeUnlock(); - } - } - - /** - * Returns all known RPCEndpoints. - * - * @return - List of RPC Endpoints. - */ - public Collection<EndpointStateMachine> getValues() { - return scmMachines.values(); - } - - @Override - public void close() throws IOException { - getValues().forEach(endpointStateMachine - -> IOUtils.cleanupWithLogger(LOG, endpointStateMachine)); - MBeans.unregister(jmxBean); - } - - @Override - public List<EndpointStateMachineMBean> getSCMServers() { - readLock(); - try { - return Collections - .unmodifiableList(new ArrayList<>(scmMachines.values())); - - } finally { - readUnlock(); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManagerMXBean.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManagerMXBean.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManagerMXBean.java deleted file mode 100644 index 25ef163..0000000 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManagerMXBean.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine; - -import java.util.List; - -/** - * JMX information about the connected SCM servers. - */ -public interface SCMConnectionManagerMXBean { - - List<EndpointStateMachineMBean> getSCMServers(); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java deleted file mode 100644 index e4d6cd9..0000000 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ /dev/null @@ -1,281 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState; -import org.apache.hadoop.ozone.container.common.states.DatanodeState; -import org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.LinkedList; -import java.util.Queue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT; -import static org.apache.hadoop.hdsl.protocol.proto - .StorageContainerDatanodeProtocolProtos.ReportState.states - .noContainerReports; - -/** - * Current Context of State Machine. - */ -public class StateContext { - static final Logger LOG = - LoggerFactory.getLogger(StateContext.class); - private final Queue<SCMCommand> commandQueue; - private final Lock lock; - private final DatanodeStateMachine parent; - private final AtomicLong stateExecutionCount; - private final Configuration conf; - private DatanodeStateMachine.DatanodeStates state; - private SCMNodeReport nrState; - private ReportState reportState; - private static final ReportState DEFAULT_REPORT_STATE = - ReportState.newBuilder().setState(noContainerReports).setCount(0).build(); - - /** - * Constructs a StateContext. - * - * @param conf - Configration - * @param state - State - * @param parent Parent State Machine - */ - public StateContext(Configuration conf, DatanodeStateMachine.DatanodeStates - state, DatanodeStateMachine parent) { - this.conf = conf; - this.state = state; - this.parent = parent; - commandQueue = new LinkedList<>(); - lock = new ReentrantLock(); - stateExecutionCount = new AtomicLong(0); - nrState = SCMNodeReport.getDefaultInstance(); - } - - /** - * Returns the ContainerStateMachine class that holds this state. - * - * @return ContainerStateMachine. - */ - public DatanodeStateMachine getParent() { - return parent; - } - - /** - * Get the container server port. - * @return The container server port if available, return -1 if otherwise - */ - public int getContainerPort() { - return parent == null ? - INVALID_PORT : parent.getContainer().getContainerServerPort(); - } - - /** - * Gets the Ratis Port. - * @return int , return -1 if not valid. - */ - public int getRatisPort() { - return parent == null ? - INVALID_PORT : parent.getContainer().getRatisContainerServerPort(); - } - - /** - * Returns true if we are entering a new state. - * - * @return boolean - */ - boolean isEntering() { - return stateExecutionCount.get() == 0; - } - - /** - * Returns true if we are exiting from the current state. - * - * @param newState - newState. - * @return boolean - */ - boolean isExiting(DatanodeStateMachine.DatanodeStates newState) { - boolean isExiting = state != newState && stateExecutionCount.get() > 0; - if(isExiting) { - stateExecutionCount.set(0); - } - return isExiting; - } - - /** - * Returns the current state the machine is in. - * - * @return state. - */ - public DatanodeStateMachine.DatanodeStates getState() { - return state; - } - - /** - * Sets the current state of the machine. - * - * @param state state. - */ - public void setState(DatanodeStateMachine.DatanodeStates state) { - this.state = state; - } - - /** - * Returns the node report of the datanode state context. - * @return the node report. - */ - public SCMNodeReport getNodeReport() { - return nrState; - } - - /** - * Sets the storage location report of the datanode state context. - * @param nrReport - node report - */ - public void setReportState(SCMNodeReport nrReport) { - this.nrState = nrReport; - } - - /** - * Returns the next task to get executed by the datanode state machine. - * @return A callable that will be executed by the - * {@link DatanodeStateMachine} - */ - @SuppressWarnings("unchecked") - public DatanodeState<DatanodeStateMachine.DatanodeStates> getTask() { - switch (this.state) { - case INIT: - return new InitDatanodeState(this.conf, parent.getConnectionManager(), - this); - case RUNNING: - return new RunningDatanodeState(this.conf, parent.getConnectionManager(), - this); - case SHUTDOWN: - return null; - default: - throw new IllegalArgumentException("Not Implemented yet."); - } - } - - /** - * Executes the required state function. - * - * @param service - Executor Service - * @param time - seconds to wait - * @param unit - Seconds. - * @throws InterruptedException - * @throws ExecutionException - * @throws TimeoutException - */ - public void execute(ExecutorService service, long time, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - stateExecutionCount.incrementAndGet(); - DatanodeState<DatanodeStateMachine.DatanodeStates> task = getTask(); - if (this.isEntering()) { - task.onEnter(); - } - task.execute(service); - DatanodeStateMachine.DatanodeStates newState = task.await(time, unit); - if (this.state != newState) { - if (LOG.isDebugEnabled()) { - LOG.debug("Task {} executed, state transited from {} to {}", - task.getClass().getSimpleName(), this.state, newState); - } - if (isExiting(newState)) { - task.onExit(); - } - this.clearReportState(); - this.setState(newState); - } - } - - /** - * Returns the next command or null if it is empty. - * - * @return SCMCommand or Null. - */ - public SCMCommand getNextCommand() { - lock.lock(); - try { - return commandQueue.poll(); - } finally { - lock.unlock(); - } - } - - /** - * Adds a command to the State Machine queue. - * - * @param command - SCMCommand. - */ - public void addCommand(SCMCommand command) { - lock.lock(); - try { - commandQueue.add(command); - } finally { - lock.unlock(); - } - } - - /** - * Returns the count of the Execution. - * @return long - */ - public long getExecutionCount() { - return stateExecutionCount.get(); - } - - - /** - * Gets the ReportState. - * @return ReportState. - */ - public synchronized ReportState getContainerReportState() { - if (reportState == null) { - return DEFAULT_REPORT_STATE; - } - return reportState; - } - - /** - * Sets the ReportState. - * @param rState - ReportState. - */ - public synchronized void setContainerReportState(ReportState rState) { - this.reportState = rState; - } - - /** - * Clears report state after it has been communicated. - */ - public synchronized void clearReportState() { - if(reportState != null) { - setContainerReportState(null); - } - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java deleted file mode 100644 index d5df699..0000000 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java +++ /dev/null @@ -1,238 +0,0 @@ - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.container.common.statemachine.background; - -import com.google.common.collect.Lists; -import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.container.common.helpers.ContainerData; -import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; -import org.apache.hadoop.ozone.container.common.helpers.KeyUtils; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; -import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.utils.BackgroundService; -import org.apache.hadoop.utils.BackgroundTaskResult; -import org.apache.hadoop.utils.BackgroundTaskQueue; -import org.apache.hadoop.utils.BackgroundTask; -import org.apache.hadoop.utils.BatchOperation; -import org.apache.hadoop.utils.MetadataStore; -import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.util.List; -import java.util.LinkedList; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT; - -/** - * A per-datanode container block deleting service takes in charge - * of deleting staled ozone blocks. - */ -public class BlockDeletingService extends BackgroundService{ - - private static final Logger LOG = - LoggerFactory.getLogger(BlockDeletingService.class); - - private final ContainerManager containerManager; - private final Configuration conf; - - // Throttle number of blocks to delete per task, - // set to 1 for testing - private final int blockLimitPerTask; - - // Throttle the number of containers to process concurrently at a time, - private final int containerLimitPerInterval; - - // Task priority is useful when a to-delete block has weight. - private final static int TASK_PRIORITY_DEFAULT = 1; - // Core pool size for container tasks - private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 10; - - public BlockDeletingService(ContainerManager containerManager, - long serviceInterval, long serviceTimeout, Configuration conf) { - super("BlockDeletingService", serviceInterval, - TimeUnit.MILLISECONDS, BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, - serviceTimeout); - this.containerManager = containerManager; - this.conf = conf; - this.blockLimitPerTask = conf.getInt( - OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, - OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT); - this.containerLimitPerInterval = conf.getInt( - OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, - OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT); - } - - @Override - public BackgroundTaskQueue getTasks() { - BackgroundTaskQueue queue = new BackgroundTaskQueue(); - List<ContainerData> containers = Lists.newArrayList(); - try { - // We at most list a number of containers a time, - // in case there are too many containers and start too many workers. - // We must ensure there is no empty container in this result. - // The chosen result depends on what container deletion policy is - // configured. - containers = containerManager.chooseContainerForBlockDeletion( - containerLimitPerInterval); - LOG.info("Plan to choose {} containers for block deletion, " - + "actually returns {} valid containers.", - containerLimitPerInterval, containers.size()); - - for(ContainerData container : containers) { - BlockDeletingTask containerTask = - new BlockDeletingTask(container, TASK_PRIORITY_DEFAULT); - queue.add(containerTask); - } - } catch (StorageContainerException e) { - LOG.warn("Failed to initiate block deleting tasks, " - + "caused by unable to get containers info. " - + "Retry in next interval. ", e); - } catch (Exception e) { - // In case listContainer call throws any uncaught RuntimeException. - if (LOG.isDebugEnabled()) { - LOG.debug("Unexpected error occurs during deleting blocks.", e); - } - } - return queue; - } - - private static class ContainerBackgroundTaskResult - implements BackgroundTaskResult { - private List<String> deletedBlockIds; - - ContainerBackgroundTaskResult() { - deletedBlockIds = new LinkedList<>(); - } - - public void addBlockId(String blockId) { - deletedBlockIds.add(blockId); - } - - public void addAll(List<String> blockIds) { - deletedBlockIds.addAll(blockIds); - } - - public List<String> getDeletedBlocks() { - return deletedBlockIds; - } - - @Override - public int getSize() { - return deletedBlockIds.size(); - } - } - - private class BlockDeletingTask - implements BackgroundTask<BackgroundTaskResult> { - - private final int priority; - private final ContainerData containerData; - - BlockDeletingTask(ContainerData containerName, int priority) { - this.priority = priority; - this.containerData = containerName; - } - - @Override - public BackgroundTaskResult call() throws Exception { - ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult(); - long startTime = Time.monotonicNow(); - // Scan container's db and get list of under deletion blocks - MetadataStore meta = KeyUtils.getDB(containerData, conf); - // # of blocks to delete is throttled - KeyPrefixFilter filter = new KeyPrefixFilter( - OzoneConsts.DELETING_KEY_PREFIX); - List<Map.Entry<byte[], byte[]>> toDeleteBlocks = - meta.getSequentialRangeKVs(null, blockLimitPerTask, filter); - if (toDeleteBlocks.isEmpty()) { - LOG.debug("No under deletion block found in container : {}", - containerData.getContainerName()); - } - - List<String> succeedBlocks = new LinkedList<>(); - LOG.debug("Container : {}, To-Delete blocks : {}", - containerData.getContainerName(), toDeleteBlocks.size()); - File dataDir = ContainerUtils.getDataDirectory(containerData).toFile(); - if (!dataDir.exists() || !dataDir.isDirectory()) { - LOG.error("Invalid container data dir {} : " - + "not exist or not a directory", dataDir.getAbsolutePath()); - return crr; - } - - toDeleteBlocks.forEach(entry -> { - String blockName = DFSUtil.bytes2String(entry.getKey()); - LOG.debug("Deleting block {}", blockName); - try { - ContainerProtos.KeyData data = - ContainerProtos.KeyData.parseFrom(entry.getValue()); - for (ContainerProtos.ChunkInfo chunkInfo : data.getChunksList()) { - File chunkFile = dataDir.toPath() - .resolve(chunkInfo.getChunkName()).toFile(); - if (FileUtils.deleteQuietly(chunkFile)) { - LOG.debug("block {} chunk {} deleted", blockName, - chunkFile.getAbsolutePath()); - } - } - succeedBlocks.add(blockName); - } catch (InvalidProtocolBufferException e) { - LOG.error("Failed to parse block info for block {}", blockName, e); - } - }); - - // Once files are deleted ... clean up DB - BatchOperation batch = new BatchOperation(); - succeedBlocks.forEach(entry -> - batch.delete(DFSUtil.string2Bytes(entry))); - meta.writeBatch(batch); - // update count of pending deletion blocks in in-memory container status - containerManager.decrPendingDeletionBlocks(succeedBlocks.size(), - containerData.getContainerName()); - - if (!succeedBlocks.isEmpty()) { - LOG.info("Container: {}, deleted blocks: {}, task elapsed time: {}ms", - containerData.getContainerName(), succeedBlocks.size(), - Time.monotonicNow() - startTime); - } - crr.addAll(succeedBlocks); - return crr; - } - - @Override - public int getPriority() { - return priority; - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java deleted file mode 100644 index a9e202e..0000000 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine.background; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java deleted file mode 100644 index e872555..0000000 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; - -import org.apache.hadoop.ozone.container.common.statemachine - .SCMConnectionManager; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.apache.hadoop.hdsl.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCloseContainerCmdResponseProto; -import org.apache.hadoop.hdsl.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCmdType; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Container Report handler. - */ -public class CloseContainerHandler implements CommandHandler { - static final Logger LOG = - LoggerFactory.getLogger(CloseContainerHandler.class); - private int invocationCount; - private long totalTime; - - /** - * Constructs a ContainerReport handler. - */ - public CloseContainerHandler() { - } - - /** - * Handles a given SCM command. - * - * @param command - SCM Command - * @param container - Ozone Container. - * @param context - Current Context. - * @param connectionManager - The SCMs that we are talking to. - */ - @Override - public void handle(SCMCommand command, OzoneContainer container, - StateContext context, SCMConnectionManager connectionManager) { - LOG.debug("Processing Close Container command."); - invocationCount++; - long startTime = Time.monotonicNow(); - String containerName = "UNKNOWN"; - try { - - SCMCloseContainerCmdResponseProto - closeContainerProto = - SCMCloseContainerCmdResponseProto - .parseFrom(command.getProtoBufMessage()); - containerName = closeContainerProto.getContainerName(); - - container.getContainerManager().closeContainer(containerName); - - } catch (Exception e) { - LOG.error("Can't close container " + containerName, e); - } finally { - long endTime = Time.monotonicNow(); - totalTime += endTime - startTime; - } - } - - /** - * Returns the command type that this command handler handles. - * - * @return Type - */ - @Override - public SCMCmdType getCommandType() { - return SCMCmdType.closeContainerCommand; - } - - /** - * Returns number of times this handler has been invoked. - * - * @return int - */ - @Override - public int getInvocationCount() { - return invocationCount; - } - - /** - * Returns the average time this function takes to run. - * - * @return long - */ - @Override - public long getAverageRunTime() { - if (invocationCount > 0) { - return totalTime / invocationCount; - } - return 0; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java deleted file mode 100644 index fee3e1c..0000000 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java +++ /dev/null @@ -1,177 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType; -import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -/** - * Dispatches command to the correct handler. - */ -public final class CommandDispatcher { - static final Logger LOG = - LoggerFactory.getLogger(CommandDispatcher.class); - private final StateContext context; - private final Map<SCMCmdType, CommandHandler> handlerMap; - private final OzoneContainer container; - private final SCMConnectionManager connectionManager; - - /** - * Constructs a command Dispatcher. - * @param context - Context. - */ - /** - * Constructs a command dispatcher. - * - * @param container - Ozone Container - * @param context - Context - * @param handlers - Set of handlers. - */ - private CommandDispatcher(OzoneContainer container, SCMConnectionManager - connectionManager, StateContext context, - CommandHandler... handlers) { - Preconditions.checkNotNull(context); - Preconditions.checkNotNull(handlers); - Preconditions.checkArgument(handlers.length > 0); - Preconditions.checkNotNull(container); - Preconditions.checkNotNull(connectionManager); - this.context = context; - this.container = container; - this.connectionManager = connectionManager; - handlerMap = new HashMap<>(); - for (CommandHandler h : handlers) { - if(handlerMap.containsKey(h.getCommandType())){ - LOG.error("Duplicate handler for the same command. Exiting. Handle " + - "key : { }", h.getCommandType().getDescriptorForType().getName()); - throw new IllegalArgumentException("Duplicate handler for the same " + - "command."); - } - handlerMap.put(h.getCommandType(), h); - } - } - - /** - * Dispatch the command to the correct handler. - * - * @param command - SCM Command. - */ - public void handle(SCMCommand command) { - Preconditions.checkNotNull(command); - CommandHandler handler = handlerMap.get(command.getType()); - if (handler != null) { - handler.handle(command, container, context, connectionManager); - } else { - LOG.error("Unknown SCM Command queued. There is no handler for this " + - "command. Command: {}", command.getType().getDescriptorForType() - .getName()); - } - } - - public static Builder newBuilder() { - return new Builder(); - } - - /** - * Helper class to construct command dispatcher. - */ - public static class Builder { - private final List<CommandHandler> handlerList; - private OzoneContainer container; - private StateContext context; - private SCMConnectionManager connectionManager; - - public Builder() { - handlerList = new LinkedList<>(); - } - - /** - * Adds a handler. - * - * @param handler - handler - * @return Builder - */ - public Builder addHandler(CommandHandler handler) { - Preconditions.checkNotNull(handler); - handlerList.add(handler); - return this; - } - - /** - * Add the OzoneContainer. - * - * @param ozoneContainer - ozone container. - * @return Builder - */ - public Builder setContainer(OzoneContainer ozoneContainer) { - Preconditions.checkNotNull(ozoneContainer); - this.container = ozoneContainer; - return this; - } - - /** - * Set the Connection Manager. - * - * @param scmConnectionManager - * @return this - */ - public Builder setConnectionManager(SCMConnectionManager - scmConnectionManager) { - Preconditions.checkNotNull(scmConnectionManager); - this.connectionManager = scmConnectionManager; - return this; - } - - /** - * Sets the Context. - * - * @param stateContext - StateContext - * @return this - */ - public Builder setContext(StateContext stateContext) { - Preconditions.checkNotNull(stateContext); - this.context = stateContext; - return this; - } - - /** - * Builds a command Dispatcher. - * @return Command Dispatcher. - */ - public CommandDispatcher build() { - Preconditions.checkNotNull(this.connectionManager, "Missing connection" + - " manager."); - Preconditions.checkNotNull(this.container, "Missing container."); - Preconditions.checkNotNull(this.context, "Missing context."); - Preconditions.checkArgument(this.handlerList.size() > 0); - return new CommandDispatcher(this.container, this.connectionManager, - this.context, handlerList.toArray( - new CommandHandler[handlerList.size()])); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java deleted file mode 100644 index b54923e..0000000 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; - -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType; -import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; - -/** - * Generic interface for handlers. - */ -public interface CommandHandler { - - /** - * Handles a given SCM command. - * @param command - SCM Command - * @param container - Ozone Container. - * @param context - Current Context. - * @param connectionManager - The SCMs that we are talking to. - */ - void handle(SCMCommand command, OzoneContainer container, - StateContext context, SCMConnectionManager connectionManager); - - /** - * Returns the command type that this command handler handles. - * @return Type - */ - SCMCmdType getCommandType(); - - /** - * Returns number of times this handler has been invoked. - * @return int - */ - int getInvocationCount(); - - /** - * Returns the average time this function takes to run. - * @return long - */ - long getAverageRunTime(); - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java deleted file mode 100644 index e9f4b61..0000000 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; - -import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine; -import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * Container Report handler. - */ -public class ContainerReportHandler implements CommandHandler { - static final Logger LOG = - LoggerFactory.getLogger(ContainerReportHandler.class); - private int invocationCount; - private long totalTime; - - /** - * Constructs a ContainerReport handler. - */ - public ContainerReportHandler() { - } - - /** - * Handles a given SCM command. - * - * @param command - SCM Command - * @param container - Ozone Container. - * @param context - Current Context. - * @param connectionManager - The SCMs that we are talking to. - */ - @Override - public void handle(SCMCommand command, OzoneContainer container, - StateContext context, SCMConnectionManager connectionManager) { - LOG.debug("Processing Container Report."); - invocationCount++; - long startTime = Time.monotonicNow(); - try { - ContainerReportsRequestProto contianerReport = - container.getContainerReport(); - - // TODO : We send this report to all SCMs.Check if it is enough only to - // send to the leader once we have RAFT enabled SCMs. - for (EndpointStateMachine endPoint : connectionManager.getValues()) { - endPoint.getEndPoint().sendContainerReport(contianerReport); - } - } catch (IOException ex) { - LOG.error("Unable to process the Container Report command.", ex); - } finally { - long endTime = Time.monotonicNow(); - totalTime += endTime - startTime; - } - } - - /** - * Returns the command type that this command handler handles. - * - * @return Type - */ - @Override - public SCMCmdType getCommandType() { - return SCMCmdType.sendContainerReport; - } - - /** - * Returns number of times this handler has been invoked. - * - * @return int - */ - @Override - public int getInvocationCount() { - return invocationCount; - } - - /** - * Returns the average time this function takes to run. - * - * @return long - */ - @Override - public long getAverageRunTime() { - if (invocationCount > 0) { - return totalTime / invocationCount; - } - return 0; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java deleted file mode 100644 index ff38cdc..0000000 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ /dev/null @@ -1,204 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; - - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.container.common.helpers.ContainerData; -import org.apache.hadoop.ozone.container.common.helpers.DeletedContainerBlocksSummary; -import org.apache.hadoop.ozone.container.common.helpers.KeyUtils; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; -import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine; -import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.utils.BatchOperation; -import org.apache.hadoop.utils.MetadataStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; - -/** - * Handle block deletion commands. - */ -public class DeleteBlocksCommandHandler implements CommandHandler { - - private static final Logger LOG = - LoggerFactory.getLogger(DeleteBlocksCommandHandler.class); - - private ContainerManager containerManager; - private Configuration conf; - private int invocationCount; - private long totalTime; - - public DeleteBlocksCommandHandler(ContainerManager containerManager, - Configuration conf) { - this.containerManager = containerManager; - this.conf = conf; - } - - @Override - public void handle(SCMCommand command, OzoneContainer container, - StateContext context, SCMConnectionManager connectionManager) { - if (command.getType() != SCMCmdType.deleteBlocksCommand) { - LOG.warn("Skipping handling command, expected command " - + "type {} but found {}", - SCMCmdType.deleteBlocksCommand, command.getType()); - return; - } - LOG.debug("Processing block deletion command."); - invocationCount++; - long startTime = Time.monotonicNow(); - - // move blocks to deleting state. - // this is a metadata update, the actual deletion happens in another - // recycling thread. - DeleteBlocksCommand cmd = (DeleteBlocksCommand) command; - List<DeletedBlocksTransaction> containerBlocks = cmd.blocksTobeDeleted(); - - - DeletedContainerBlocksSummary summary = - DeletedContainerBlocksSummary.getFrom(containerBlocks); - LOG.info("Start to delete container blocks, TXIDs={}, " - + "numOfContainers={}, numOfBlocks={}", - summary.getTxIDSummary(), - summary.getNumOfContainers(), - summary.getNumOfBlocks()); - - ContainerBlocksDeletionACKProto.Builder resultBuilder = - ContainerBlocksDeletionACKProto.newBuilder(); - containerBlocks.forEach(entry -> { - DeleteBlockTransactionResult.Builder txResultBuilder = - DeleteBlockTransactionResult.newBuilder(); - txResultBuilder.setTxID(entry.getTxID()); - try { - deleteContainerBlocks(entry, conf); - txResultBuilder.setSuccess(true); - } catch (IOException e) { - LOG.warn("Failed to delete blocks for container={}, TXID={}", - entry.getContainerName(), entry.getTxID(), e); - txResultBuilder.setSuccess(false); - } - resultBuilder.addResults(txResultBuilder.build()); - }); - ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build(); - - // Send ACK back to SCM as long as meta updated - // TODO Or we should wait until the blocks are actually deleted? - if (!containerBlocks.isEmpty()) { - for (EndpointStateMachine endPoint : connectionManager.getValues()) { - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Sending following block deletion ACK to SCM"); - for (DeleteBlockTransactionResult result : - blockDeletionACK.getResultsList()) { - LOG.debug(result.getTxID() + " : " + result.getSuccess()); - } - } - endPoint.getEndPoint() - .sendContainerBlocksDeletionACK(blockDeletionACK); - } catch (IOException e) { - LOG.error("Unable to send block deletion ACK to SCM {}", - endPoint.getAddress().toString(), e); - } - } - } - - long endTime = Time.monotonicNow(); - totalTime += endTime - startTime; - } - - /** - * Move a bunch of blocks from a container to deleting state. - * This is a meta update, the actual deletes happen in async mode. - * - * @param delTX a block deletion transaction. - * @param config configuration. - * @throws IOException if I/O error occurs. - */ - private void deleteContainerBlocks(DeletedBlocksTransaction delTX, - Configuration config) throws IOException { - String containerId = delTX.getContainerName(); - ContainerData containerInfo = containerManager.readContainer(containerId); - if (LOG.isDebugEnabled()) { - LOG.debug("Processing Container : {}, DB path : {}", containerId, - containerInfo.getDBPath()); - } - - int newDeletionBlocks = 0; - MetadataStore containerDB = KeyUtils.getDB(containerInfo, config); - for (String blk : delTX.getBlockIDList()) { - BatchOperation batch = new BatchOperation(); - byte[] blkBytes = DFSUtil.string2Bytes(blk); - byte[] blkInfo = containerDB.get(blkBytes); - if (blkInfo != null) { - // Found the block in container db, - // use an atomic update to change its state to deleting. - batch.put(DFSUtil.string2Bytes(OzoneConsts.DELETING_KEY_PREFIX + blk), - blkInfo); - batch.delete(blkBytes); - try { - containerDB.writeBatch(batch); - newDeletionBlocks++; - LOG.debug("Transited Block {} to DELETING state in container {}", - blk, containerId); - } catch (IOException e) { - // if some blocks failed to delete, we fail this TX, - // without sending this ACK to SCM, SCM will resend the TX - // with a certain number of retries. - throw new IOException( - "Failed to delete blocks for TXID = " + delTX.getTxID(), e); - } - } else { - LOG.debug("Block {} not found or already under deletion in" - + " container {}, skip deleting it.", blk, containerId); - } - } - - // update pending deletion blocks count in in-memory container status - containerManager.incrPendingDeletionBlocks(newDeletionBlocks, containerId); - } - - @Override - public SCMCmdType getCommandType() { - return SCMCmdType.deleteBlocksCommand; - } - - @Override - public int getInvocationCount() { - return this.invocationCount; - } - - @Override - public long getAverageRunTime() { - if (invocationCount > 0) { - return totalTime / invocationCount; - } - return 0; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java deleted file mode 100644 index 1e9c8dc..0000000 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java deleted file mode 100644 index feb2f81..0000000 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine; -/** - - State machine class is used by the container to denote various states a - container can be in and also is used for command processing. - - Container has the following states. - - Start - > getVersion -> Register -> Running -> Shutdown - - */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java deleted file mode 100644 index 75142af..0000000 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.common.states; - -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * State Interface that allows tasks to maintain states. - */ -public interface DatanodeState<T> { - /** - * Called before entering this state. - */ - void onEnter(); - - /** - * Called After exiting this state. - */ - void onExit(); - - /** - * Executes one or more tasks that is needed by this state. - * - * @param executor - ExecutorService - */ - void execute(ExecutorService executor); - - /** - * Wait for execute to finish. - * - * @param time - Time - * @param timeUnit - Unit of time. - */ - T await(long time, TimeUnit timeUnit) - throws InterruptedException, ExecutionException, TimeoutException; - -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org