http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java index 6eb7ea6..e1e2909 100644 --- a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java +++ b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java @@ -25,16 +25,16 @@ import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics; import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher; import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache; -import org.apache.hadoop.hdsl.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ozone.MiniOzoneClassicCluster; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.scm.XceiverClientManager; -import org.apache.hadoop.scm.XceiverClientSpi; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.scm.storage.ContainerProtocolCalls; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.junit.AfterClass;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/kubernetes/TestDynamicProvisioner.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/kubernetes/TestDynamicProvisioner.java b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/kubernetes/TestDynamicProvisioner.java index 8d1a865..0268ccc 100644 --- a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/kubernetes/TestDynamicProvisioner.java +++ b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/kubernetes/TestDynamicProvisioner.java @@ -29,7 +29,7 @@ import org.junit.Test; import java.nio.file.Files; import java.nio.file.Paths; -import org.apache.hadoop.hdsl.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; /** * Test the resource generation of Dynamic Provisioner. http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java index 8cb57d6..d7dabe3 100644 --- a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java +++ b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java @@ -19,7 +19,7 @@ package org.apache.hadoop.cblock.util; import org.apache.hadoop.cblock.meta.ContainerDescriptor; import org.apache.hadoop.ozone.container.ContainerTestHelper; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java index 59c8e01..9fa76a8 100644 --- a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java +++ b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java @@ -18,12 +18,12 @@ package org.apache.hadoop.cblock.util; import org.apache.hadoop.cblock.meta.ContainerDescriptor; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ContainerData; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.ContainerData; import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.hdsl.protocol.proto.HdslProtos; -import org.apache.hadoop.scm.client.ScmClient; -import org.apache.hadoop.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.client.ScmClient; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import java.io.IOException; import java.util.ArrayList; @@ -88,7 +88,7 @@ public class MockStorageClient implements ScmClient { ContainerInfo container = new ContainerInfo.Builder() .setContainerName(containerDescriptor.getContainerID()) .setPipeline(containerDescriptor.getPipeline()) - .setState(HdslProtos.LifeCycleState.ALLOCATED) + .setState(HddsProtos.LifeCycleState.ALLOCATED) .build(); containerList.add(container); return containerList; @@ -134,8 +134,8 @@ public class MockStorageClient implements ScmClient { } @Override - public Pipeline createContainer(HdslProtos.ReplicationType type, - HdslProtos.ReplicationFactor replicationFactor, String containerId, + public Pipeline createContainer(HddsProtos.ReplicationType type, + HddsProtos.ReplicationFactor replicationFactor, String containerId, String owner) throws IOException { int contId = currentContainerId.getAndIncrement(); ContainerLookUpService.addContainer(Long.toString(contId)); @@ -153,8 +153,8 @@ public class MockStorageClient implements ScmClient { * @throws IOException */ @Override - public HdslProtos.NodePool queryNode(EnumSet<HdslProtos.NodeState> - nodeStatuses, HdslProtos.QueryScope queryScope, String poolName) + public HddsProtos.NodePool queryNode(EnumSet<HddsProtos.NodeState> + nodeStatuses, HddsProtos.QueryScope queryScope, String poolName) throws IOException { return null; } @@ -168,8 +168,8 @@ public class MockStorageClient implements ScmClient { * @throws IOException */ @Override - public Pipeline createReplicationPipeline(HdslProtos.ReplicationType type, - HdslProtos.ReplicationFactor factor, HdslProtos.NodePool nodePool) + public Pipeline createReplicationPipeline(HddsProtos.ReplicationType type, + HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool) throws IOException { return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-cblock/tools/src/main/java/org/apache/hadoop/cblock/cli/CBlockCli.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/tools/src/main/java/org/apache/hadoop/cblock/cli/CBlockCli.java b/hadoop-cblock/tools/src/main/java/org/apache/hadoop/cblock/cli/CBlockCli.java index 224c908..c6c0e84 100644 --- a/hadoop-cblock/tools/src/main/java/org/apache/hadoop/cblock/cli/CBlockCli.java +++ b/hadoop-cblock/tools/src/main/java/org/apache/hadoop/cblock/cli/CBlockCli.java @@ -32,7 +32,7 @@ import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.hdsl.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-cblock/tools/src/test/org/apache/hadoop/cblock/TestCBlockCLI.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/tools/src/test/org/apache/hadoop/cblock/TestCBlockCLI.java b/hadoop-cblock/tools/src/test/org/apache/hadoop/cblock/TestCBlockCLI.java index f8b05ed..a3f53aa 100644 --- a/hadoop-cblock/tools/src/test/org/apache/hadoop/cblock/TestCBlockCLI.java +++ b/hadoop-cblock/tools/src/test/org/apache/hadoop/cblock/TestCBlockCLI.java @@ -21,7 +21,7 @@ import org.apache.hadoop.cblock.cli.CBlockCli; import org.apache.hadoop.cblock.meta.VolumeDescriptor; import org.apache.hadoop.cblock.util.MockStorageClient; import org.apache.hadoop.conf.OzoneConfiguration; -import org.apache.hadoop.scm.client.ScmClient; +import org.apache.hadoop.hdds.scm.client.ScmClient; import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; import org.junit.AfterClass; http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh b/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh index c3e7113..a7e15d7 100755 --- a/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh +++ b/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh @@ -596,8 +596,8 @@ function hadoop_bootstrap YARN_LIB_JARS_DIR=${YARN_LIB_JARS_DIR:-"share/hadoop/yarn/lib"} MAPRED_DIR=${MAPRED_DIR:-"share/hadoop/mapreduce"} MAPRED_LIB_JARS_DIR=${MAPRED_LIB_JARS_DIR:-"share/hadoop/mapreduce/lib"} - HDSL_DIR=${HDSL_DIR:-"share/hadoop/hdsl"} - HDSL_LIB_JARS_DIR=${HDSL_LIB_JARS_DIR:-"share/hadoop/hdsl/lib"} + HDDS_DIR=${HDDS_DIR:-"share/hadoop/hdds"} + HDDS_LIB_JARS_DIR=${HDDS_LIB_JARS_DIR:-"share/hadoop/hdds/lib"} OZONE_DIR=${OZONE_DIR:-"share/hadoop/ozone"} OZONE_LIB_JARS_DIR=${OZONE_LIB_JARS_DIR:-"share/hadoop/ozone/lib"} CBLOCK_DIR=${CBLOCK_DIR:-"share/hadoop/cblock"} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-dist/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-dist/pom.xml b/hadoop-dist/pom.xml index 98a9edf..3003550 100644 --- a/hadoop-dist/pom.xml +++ b/hadoop-dist/pom.xml @@ -219,7 +219,7 @@ </profile> <profile> - <id>hdsl</id> + <id>hdds</id> <activation> <activeByDefault>false</activeByDefault> </activation> @@ -231,11 +231,11 @@ </dependency> <dependency> <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdsl-server-scm</artifactId> + <artifactId>hadoop-hdds-server-scm</artifactId> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdsl-tools</artifactId> + <artifactId>hadoop-hdds-tools</artifactId> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> @@ -243,7 +243,7 @@ </dependency> <dependency> <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdsl-container-service</artifactId> + <artifactId>hadoop-hdds-container-service</artifactId> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> @@ -251,7 +251,7 @@ </dependency> <dependency> <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdsl-tools</artifactId> + <artifactId>hadoop-hdds-tools</artifactId> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-dist/src/main/compose/cblock/docker-config ---------------------------------------------------------------------- diff --git a/hadoop-dist/src/main/compose/cblock/docker-config b/hadoop-dist/src/main/compose/cblock/docker-config index da0c2ac..4690de0 100644 --- a/hadoop-dist/src/main/compose/cblock/docker-config +++ b/hadoop-dist/src/main/compose/cblock/docker-config @@ -27,7 +27,7 @@ OZONE-SITE.XML_ozone.scm.client.address=scm OZONE-SITE.XML_dfs.cblock.jscsi.cblock.server.address=cblock OZONE-SITE.XML_dfs.cblock.scm.ipaddress=scm OZONE-SITE.XML_dfs.cblock.service.leveldb.path=/tmp -HDFS-SITE.XML_dfs.datanode.plugins=org.apache.hadoop.ozone.web.ObjectStoreRestPlugin,org.apache.hadoop.ozone.HdslDatanodeService +HDFS-SITE.XML_dfs.datanode.plugins=org.apache.hadoop.ozone.web.ObjectStoreRestPlugin,org.apache.hadoop.ozone.HddsDatanodeService HDFS-SITE.XML_dfs.namenode.rpc-address=namenode:9000 HDFS-SITE.XML_dfs.namenode.name.dir=/data/namenode HDFS-SITE.XML_rpc.metrics.quantile.enable=true http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-dist/src/main/compose/ozone/docker-config ---------------------------------------------------------------------- diff --git a/hadoop-dist/src/main/compose/ozone/docker-config b/hadoop-dist/src/main/compose/ozone/docker-config index d297b19..8e5efa9 100644 --- a/hadoop-dist/src/main/compose/ozone/docker-config +++ b/hadoop-dist/src/main/compose/ozone/docker-config @@ -27,7 +27,7 @@ HDFS-SITE.XML_dfs.namenode.rpc-address=namenode:9000 HDFS-SITE.XML_dfs.namenode.name.dir=/data/namenode HDFS-SITE.XML_rpc.metrics.quantile.enable=true HDFS-SITE.XML_rpc.metrics.percentiles.intervals=60,300 -HDFS-SITE.XML_dfs.datanode.plugins=org.apache.hadoop.ozone.web.ObjectStoreRestPlugin,org.apache.hadoop.ozone.HdslDatanodeService +HDFS-SITE.XML_dfs.datanode.plugins=org.apache.hadoop.ozone.web.ObjectStoreRestPlugin,org.apache.hadoop.ozone.HddsDatanodeService LOG4J.PROPERTIES_log4j.rootLogger=INFO, stdout LOG4J.PROPERTIES_log4j.appender.stdout=org.apache.log4j.ConsoleAppender LOG4J.PROPERTIES_log4j.appender.stdout.layout=org.apache.log4j.PatternLayout http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/client/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/pom.xml b/hadoop-hdds/client/pom.xml new file mode 100644 index 0000000..95ff09a --- /dev/null +++ b/hadoop-hdds/client/pom.xml @@ -0,0 +1,49 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 +http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdds</artifactId> + <version>3.2.0-SNAPSHOT</version> + </parent> + <artifactId>hadoop-hdds-client</artifactId> + <version>3.2.0-SNAPSHOT</version> + <description>Apache Hadoop Distributed Data Store Client libraries</description> + <name>Apache HDDS Client</name> + <packaging>jar</packaging> + + <properties> + <hadoop.component>hdds</hadoop.component> + <is.hadoop.component>true</is.hadoop.component> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdds-common</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + </dependency> + + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java new file mode 100644 index 0000000..5c702c6 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Semaphore; + +/** + * A Client for the storageContainer protocol. + */ +public class XceiverClient extends XceiverClientSpi { + static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class); + private final Pipeline pipeline; + private final Configuration config; + private Channel channel; + private Bootstrap b; + private EventLoopGroup group; + private final Semaphore semaphore; + + /** + * Constructs a client that can communicate with the Container framework on + * data nodes. + * + * @param pipeline - Pipeline that defines the machines. + * @param config -- Ozone Config + */ + public XceiverClient(Pipeline pipeline, Configuration config) { + super(); + Preconditions.checkNotNull(pipeline); + Preconditions.checkNotNull(config); + this.pipeline = pipeline; + this.config = config; + this.semaphore = + new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config)); + } + + @Override + public void connect() throws Exception { + if (channel != null && channel.isActive()) { + throw new IOException("This client is already connected to a host."); + } + + group = new NioEventLoopGroup(); + b = new Bootstrap(); + b.group(group) + .channel(NioSocketChannel.class) + .handler(new LoggingHandler(LogLevel.INFO)) + .handler(new XceiverClientInitializer(this.pipeline, semaphore)); + DatanodeDetails leader = this.pipeline.getLeader(); + + // read port from the data node, on failure use default configured + // port. + int port = leader.getContainerPort(); + if (port == 0) { + port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, + OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); + } + LOG.debug("Connecting to server Port : " + port); + channel = b.connect(leader.getHostName(), port).sync().channel(); + } + + /** + * Returns if the exceiver client connects to a server. + * + * @return True if the connection is alive, false otherwise. + */ + @VisibleForTesting + public boolean isConnected() { + return channel.isActive(); + } + + @Override + public void close() { + if (group != null) { + group.shutdownGracefully().awaitUninterruptibly(); + } + } + + @Override + public Pipeline getPipeline() { + return pipeline; + } + + @Override + public ContainerProtos.ContainerCommandResponseProto sendCommand( + ContainerProtos.ContainerCommandRequestProto request) throws IOException { + try { + if ((channel == null) || (!channel.isActive())) { + throw new IOException("This channel is not connected."); + } + XceiverClientHandler handler = + channel.pipeline().get(XceiverClientHandler.class); + + return handler.sendCommand(request); + } catch (ExecutionException | InterruptedException e) { + /** + * In case the netty channel handler throws an exception, + * the exception thrown will be wrapped within {@link ExecutionException}. + * Unwarpping here so that original exception gets passed + * to to the client. + */ + if (e instanceof ExecutionException) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException) cause; + } + } + throw new IOException( + "Unexpected exception during execution:" + e.getMessage()); + } + } + + /** + * Sends a given command to server gets a waitable future back. + * + * @param request Request + * @return Response to the command + * @throws IOException + */ + @Override + public CompletableFuture<ContainerProtos.ContainerCommandResponseProto> + sendCommandAsync(ContainerProtos.ContainerCommandRequestProto request) + throws IOException, ExecutionException, InterruptedException { + if ((channel == null) || (!channel.isActive())) { + throw new IOException("This channel is not connected."); + } + XceiverClientHandler handler = + channel.pipeline().get(XceiverClientHandler.class); + return handler.sendCommandAsync(request); + } + + /** + * Create a pipeline. + * + * @param pipelineID - Name of the pipeline. + * @param datanodes - Datanodes + */ + @Override + public void createPipeline(String pipelineID, List<DatanodeDetails> datanodes) + throws IOException { + // For stand alone pipeline, there is no notion called setup pipeline. + return; + } + + /** + * Returns pipeline Type. + * + * @return - Stand Alone as the type. + */ + @Override + public HddsProtos.ReplicationType getPipelineType() { + return HddsProtos.ReplicationType.STAND_ALONE; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientHandler.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientHandler.java new file mode 100644 index 0000000..e2b55ac --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientHandler.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm; + +import com.google.common.base.Preconditions; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos + .ContainerCommandResponseProto; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; + +/** + * Netty client handler. + */ +public class XceiverClientHandler extends + SimpleChannelInboundHandler<ContainerCommandResponseProto> { + + static final Logger LOG = LoggerFactory.getLogger(XceiverClientHandler.class); + private final ConcurrentMap<String, ResponseFuture> responses = + new ConcurrentHashMap<>(); + + private final Pipeline pipeline; + private volatile Channel channel; + private XceiverClientMetrics metrics; + private final Semaphore semaphore; + + /** + * Constructs a client that can communicate to a container server. + */ + public XceiverClientHandler(Pipeline pipeline, Semaphore semaphore) { + super(false); + Preconditions.checkNotNull(pipeline); + this.pipeline = pipeline; + this.metrics = XceiverClientManager.getXceiverClientMetrics(); + this.semaphore = semaphore; + } + + /** + * <strong>Please keep in mind that this method will be renamed to {@code + * messageReceived(ChannelHandlerContext, I)} in 5.0.</strong> + * <p> + * Is called for each message of type {@link ContainerProtos + * .ContainerCommandResponseProto}. + * + * @param ctx the {@link ChannelHandlerContext} which this {@link + * SimpleChannelInboundHandler} belongs to + * @param msg the message to handle + * @throws Exception is thrown if an error occurred + */ + @Override + public void channelRead0(ChannelHandlerContext ctx, + ContainerProtos.ContainerCommandResponseProto msg) + throws Exception { + Preconditions.checkNotNull(msg); + metrics.decrPendingContainerOpsMetrics(msg.getCmdType()); + + String key = msg.getTraceID(); + ResponseFuture response = responses.remove(key); + semaphore.release(); + + if (response != null) { + response.getFuture().complete(msg); + + long requestTime = response.getRequestTime(); + metrics.addContainerOpsLatency(msg.getCmdType(), + Time.monotonicNowNanos() - requestTime); + } else { + LOG.error("A reply received for message that was not queued. trace " + + "ID: {}", msg.getTraceID()); + } + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) { + LOG.debug("channelRegistered: Connected to ctx"); + channel = ctx.channel(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + LOG.info("Exception in client " + cause.toString()); + Iterator<String> keyIterator = responses.keySet().iterator(); + while (keyIterator.hasNext()) { + ResponseFuture response = responses.remove(keyIterator.next()); + response.getFuture().completeExceptionally(cause); + semaphore.release(); + } + ctx.close(); + } + + /** + * Since netty is async, we send a work request and then wait until a response + * appears in the reply queue. This is simple sync interface for clients. we + * should consider building async interfaces for client if this turns out to + * be a performance bottleneck. + * + * @param request - request. + * @return -- response + */ + + public ContainerCommandResponseProto sendCommand( + ContainerProtos.ContainerCommandRequestProto request) + throws ExecutionException, InterruptedException { + Future<ContainerCommandResponseProto> future = sendCommandAsync(request); + return future.get(); + } + + /** + * SendCommandAsyc queues a command to the Netty Subsystem and returns a + * CompletableFuture. This Future is marked compeleted in the channelRead0 + * when the call comes back. + * @param request - Request to execute + * @return CompletableFuture + */ + public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync( + ContainerProtos.ContainerCommandRequestProto request) + throws InterruptedException { + + // Throw an exception of request doesn't have traceId + if (StringUtils.isEmpty(request.getTraceID())) { + throw new IllegalArgumentException("Invalid trace ID"); + } + + // Setting the datanode ID in the commands, so that we can distinguish + // commands when the cluster simulator is running. + if(!request.hasDatanodeUuid()) { + throw new IllegalArgumentException("Invalid Datanode ID"); + } + + metrics.incrPendingContainerOpsMetrics(request.getCmdType()); + + CompletableFuture<ContainerCommandResponseProto> future + = new CompletableFuture<>(); + ResponseFuture response = new ResponseFuture(future, + Time.monotonicNowNanos()); + semaphore.acquire(); + ResponseFuture previous = responses.putIfAbsent( + request.getTraceID(), response); + if (previous != null) { + LOG.error("Command with Trace already exists. Ignoring this command. " + + "{}. Previous Command: {}", request.getTraceID(), + previous.toString()); + throw new IllegalStateException("Duplicate trace ID. Command with this " + + "trace ID is already executing. Please ensure that " + + "trace IDs are not reused. ID: " + request.getTraceID()); + } + + channel.writeAndFlush(request); + return response.getFuture(); + } + + /** + * Class wraps response future info. + */ + static class ResponseFuture { + private final long requestTime; + private final CompletableFuture<ContainerCommandResponseProto> future; + + ResponseFuture(CompletableFuture<ContainerCommandResponseProto> future, + long requestTime) { + this.future = future; + this.requestTime = requestTime; + } + + public long getRequestTime() { + return requestTime; + } + + public CompletableFuture<ContainerCommandResponseProto> getFuture() { + return future; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientInitializer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientInitializer.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientInitializer.java new file mode 100644 index 0000000..e10a9f6 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientInitializer.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.protobuf.ProtobufDecoder; +import io.netty.handler.codec.protobuf.ProtobufEncoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos; + +import java.util.concurrent.Semaphore; + +/** + * Setup the netty pipeline. + */ +public class XceiverClientInitializer extends + ChannelInitializer<SocketChannel> { + private final Pipeline pipeline; + private final Semaphore semaphore; + + /** + * Constructs an Initializer for the client pipeline. + * @param pipeline - Pipeline. + */ + public XceiverClientInitializer(Pipeline pipeline, Semaphore semaphore) { + this.pipeline = pipeline; + this.semaphore = semaphore; + } + + /** + * This method will be called once when the Channel is registered. After + * the method returns this instance will be removed from the + * ChannelPipeline of the Channel. + * + * @param ch Channel which was registered. + * @throws Exception is thrown if an error occurs. In that case the + * Channel will be closed. + */ + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + + p.addLast(new ProtobufVarint32FrameDecoder()); + p.addLast(new ProtobufDecoder(ContainerProtos + .ContainerCommandResponseProto.getDefaultInstance())); + + p.addLast(new ProtobufVarint32LengthFieldPrepender()); + p.addLast(new ProtobufEncoder()); + + p.addLast(new XceiverClientHandler(this.pipeline, this.semaphore)); + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java new file mode 100644 index 0000000..7585104 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .SCM_CONTAINER_CLIENT_MAX_SIZE_KEY; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos + .ReplicationType.RATIS; + +/** + * XceiverClientManager is responsible for the lifecycle of XceiverClient + * instances. Callers use this class to acquire an XceiverClient instance + * connected to the desired container pipeline. When done, the caller also uses + * this class to release the previously acquired XceiverClient instance. + * + * + * This class caches connection to container for reuse purpose, such that + * accessing same container frequently will be through the same connection + * without reestablishing connection. But the connection will be closed if + * not being used for a period of time. + */ +public class XceiverClientManager implements Closeable { + + //TODO : change this to SCM configuration class + private final Configuration conf; + private final Cache<String, XceiverClientSpi> clientCache; + private final boolean useRatis; + + private static XceiverClientMetrics metrics; + /** + * Creates a new XceiverClientManager. + * + * @param conf configuration + */ + public XceiverClientManager(Configuration conf) { + Preconditions.checkNotNull(conf); + int maxSize = conf.getInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, + SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT); + long staleThresholdMs = conf.getTimeDuration( + SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY, + SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT, TimeUnit.MILLISECONDS); + this.useRatis = conf.getBoolean( + ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, + ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT); + this.conf = conf; + this.clientCache = CacheBuilder.newBuilder() + .expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS) + .maximumSize(maxSize) + .removalListener( + new RemovalListener<String, XceiverClientSpi>() { + @Override + public void onRemoval( + RemovalNotification<String, XceiverClientSpi> + removalNotification) { + synchronized (clientCache) { + // Mark the entry as evicted + XceiverClientSpi info = removalNotification.getValue(); + info.setEvicted(); + } + } + }).build(); + } + + @VisibleForTesting + public Cache<String, XceiverClientSpi> getClientCache() { + return clientCache; + } + + /** + * Acquires a XceiverClientSpi connected to a container capable of + * storing the specified key. + * + * If there is already a cached XceiverClientSpi, simply return + * the cached otherwise create a new one. + * + * @param pipeline the container pipeline for the client connection + * @return XceiverClientSpi connected to a container + * @throws IOException if a XceiverClientSpi cannot be acquired + */ + public XceiverClientSpi acquireClient(Pipeline pipeline) + throws IOException { + Preconditions.checkNotNull(pipeline); + Preconditions.checkArgument(pipeline.getMachines() != null); + Preconditions.checkArgument(!pipeline.getMachines().isEmpty()); + + synchronized (clientCache) { + XceiverClientSpi info = getClient(pipeline); + info.incrementReference(); + return info; + } + } + + /** + * Releases a XceiverClientSpi after use. + * + * @param client client to release + */ + public void releaseClient(XceiverClientSpi client) { + Preconditions.checkNotNull(client); + synchronized (clientCache) { + client.decrementReference(); + } + } + + private XceiverClientSpi getClient(Pipeline pipeline) + throws IOException { + String containerName = pipeline.getContainerName(); + try { + return clientCache.get(containerName, + new Callable<XceiverClientSpi>() { + @Override + public XceiverClientSpi call() throws Exception { + XceiverClientSpi client = pipeline.getType() == RATIS ? + XceiverClientRatis.newXceiverClientRatis(pipeline, conf) + : new XceiverClient(pipeline, conf); + client.connect(); + return client; + } + }); + } catch (Exception e) { + throw new IOException( + "Exception getting XceiverClient: " + e.toString(), e); + } + } + + /** + * Close and remove all the cached clients. + */ + public void close() { + //closing is done through RemovalListener + clientCache.invalidateAll(); + clientCache.cleanUp(); + + if (metrics != null) { + metrics.unRegister(); + } + } + + /** + * Tells us if Ratis is enabled for this cluster. + * @return True if Ratis is enabled. + */ + public boolean isUseRatis() { + return useRatis; + } + + /** + * Returns hard coded 3 as replication factor. + * @return 3 + */ + public HddsProtos.ReplicationFactor getFactor() { + if(isUseRatis()) { + return HddsProtos.ReplicationFactor.THREE; + } + return HddsProtos.ReplicationFactor.ONE; + } + + /** + * Returns the default replication type. + * @return Ratis or Standalone + */ + public HddsProtos.ReplicationType getType() { + // TODO : Fix me and make Ratis default before release. + // TODO: Remove this as replication factor and type are pipeline properties + if(isUseRatis()) { + return HddsProtos.ReplicationType.RATIS; + } + return HddsProtos.ReplicationType.STAND_ALONE; + } + + /** + * Get xceiver client metric. + */ + public synchronized static XceiverClientMetrics getXceiverClientMetrics() { + if (metrics == null) { + metrics = XceiverClientMetrics.create(); + } + + return metrics; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java new file mode 100644 index 0000000..a61eba1 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableRate; + +/** + * The client metrics for the Storage Container protocol. + */ +@InterfaceAudience.Private +@Metrics(about = "Storage Container Client Metrics", context = "dfs") +public class XceiverClientMetrics { + public static final String SOURCE_NAME = XceiverClientMetrics.class + .getSimpleName(); + + private @Metric MutableCounterLong pendingOps; + private MutableCounterLong[] pendingOpsArray; + private MutableRate[] containerOpsLatency; + private MetricsRegistry registry; + + public XceiverClientMetrics() { + int numEnumEntries = ContainerProtos.Type.values().length; + this.registry = new MetricsRegistry(SOURCE_NAME); + + this.pendingOpsArray = new MutableCounterLong[numEnumEntries]; + this.containerOpsLatency = new MutableRate[numEnumEntries]; + for (int i = 0; i < numEnumEntries; i++) { + pendingOpsArray[i] = registry.newCounter( + "numPending" + ContainerProtos.Type.valueOf(i + 1), + "number of pending" + ContainerProtos.Type.valueOf(i + 1) + " ops", + (long) 0); + + containerOpsLatency[i] = registry.newRate( + ContainerProtos.Type.valueOf(i + 1) + "Latency", + "latency of " + ContainerProtos.Type.valueOf(i + 1) + + " ops"); + } + } + + public static XceiverClientMetrics create() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + return ms.register(SOURCE_NAME, "Storage Container Client Metrics", + new XceiverClientMetrics()); + } + + public void incrPendingContainerOpsMetrics(ContainerProtos.Type type) { + pendingOps.incr(); + pendingOpsArray[type.ordinal()].incr(); + } + + public void decrPendingContainerOpsMetrics(ContainerProtos.Type type) { + pendingOps.incr(-1); + pendingOpsArray[type.ordinal()].incr(-1); + } + + public void addContainerOpsLatency(ContainerProtos.Type type, + long latencyNanos) { + containerOpsLatency[type.ordinal()].add(latencyNanos); + } + + public long getContainerOpsMetrics(ContainerProtos.Type type) { + return pendingOpsArray[type.ordinal()].value(); + } + + public void unRegister() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + ms.unregisterSource(SOURCE_NAME); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java new file mode 100644 index 0000000..d010c69 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm; + +import com.google.common.base.Preconditions; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos + .ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos + .ContainerCommandResponseProto; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.ratis.RatisHelper; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.rpc.RpcType; +import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; + +/** + * An abstract implementation of {@link XceiverClientSpi} using Ratis. + * The underlying RPC mechanism can be chosen via the constructor. + */ +public final class XceiverClientRatis extends XceiverClientSpi { + static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class); + + public static XceiverClientRatis newXceiverClientRatis( + Pipeline pipeline, Configuration ozoneConf) { + final String rpcType = ozoneConf.get( + ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, + ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); + final int maxOutstandingRequests = + HddsClientUtils.getMaxOutstandingRequests(ozoneConf); + return new XceiverClientRatis(pipeline, + SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests); + } + + private final Pipeline pipeline; + private final RpcType rpcType; + private final AtomicReference<RaftClient> client = new AtomicReference<>(); + private final int maxOutstandingRequests; + + /** + * Constructs a client. + */ + private XceiverClientRatis(Pipeline pipeline, RpcType rpcType, + int maxOutStandingChunks) { + super(); + this.pipeline = pipeline; + this.rpcType = rpcType; + this.maxOutstandingRequests = maxOutStandingChunks; + } + + /** + * {@inheritDoc} + */ + public void createPipeline(String clusterId, List<DatanodeDetails> datanodes) + throws IOException { + RaftGroup group = RatisHelper.newRaftGroup(datanodes); + LOG.debug("initializing pipeline:{} with nodes:{}", clusterId, + group.getPeers()); + reinitialize(datanodes, group); + } + + /** + * Returns Ratis as pipeline Type. + * + * @return - Ratis + */ + @Override + public HddsProtos.ReplicationType getPipelineType() { + return HddsProtos.ReplicationType.RATIS; + } + + private void reinitialize(List<DatanodeDetails> datanodes, RaftGroup group) + throws IOException { + if (datanodes.isEmpty()) { + return; + } + + IOException exception = null; + for (DatanodeDetails d : datanodes) { + try { + reinitialize(d, group); + } catch (IOException ioe) { + if (exception == null) { + exception = new IOException( + "Failed to reinitialize some of the RaftPeer(s)", ioe); + } else { + exception.addSuppressed(ioe); + } + } + } + if (exception != null) { + throw exception; + } + } + + /** + * Adds a new peers to the Ratis Ring. + * + * @param datanode - new datanode + * @param group - Raft group + * @throws IOException - on Failure. + */ + private void reinitialize(DatanodeDetails datanode, RaftGroup group) + throws IOException { + final RaftPeer p = RatisHelper.toRaftPeer(datanode); + try (RaftClient client = RatisHelper.newRaftClient(rpcType, p)) { + client.reinitialize(group, p.getId()); + } catch (IOException ioe) { + LOG.error("Failed to reinitialize RaftPeer:{} datanode: {} ", + p, datanode, ioe); + throw new IOException("Failed to reinitialize RaftPeer " + p + + "(datanode=" + datanode + ")", ioe); + } + } + + @Override + public Pipeline getPipeline() { + return pipeline; + } + + @Override + public void connect() throws Exception { + LOG.debug("Connecting to pipeline:{} leader:{}", + getPipeline().getPipelineName(), + RatisHelper.toRaftPeerId(pipeline.getLeader())); + // TODO : XceiverClient ratis should pass the config value of + // maxOutstandingRequests so as to set the upper bound on max no of async + // requests to be handled by raft client + if (!client.compareAndSet(null, + RatisHelper.newRaftClient(rpcType, getPipeline()))) { + throw new IllegalStateException("Client is already connected."); + } + } + + @Override + public void close() { + final RaftClient c = client.getAndSet(null); + if (c != null) { + try { + c.close(); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + } + + private RaftClient getClient() { + return Objects.requireNonNull(client.get(), "client is null"); + } + + private boolean isReadOnly(ContainerCommandRequestProto proto) { + switch (proto.getCmdType()) { + case ReadContainer: + case ReadChunk: + case ListKey: + case GetKey: + case GetSmallFile: + case ListContainer: + case ListChunk: + return true; + case CloseContainer: + case WriteChunk: + case UpdateContainer: + case CompactChunk: + case CreateContainer: + case DeleteChunk: + case DeleteContainer: + case DeleteKey: + case PutKey: + case PutSmallFile: + default: + return false; + } + } + + private RaftClientReply sendRequest(ContainerCommandRequestProto request) + throws IOException { + boolean isReadOnlyRequest = isReadOnly(request); + ByteString byteString = + ShadedProtoUtil.asShadedByteString(request.toByteArray()); + LOG.debug("sendCommand {} {}", isReadOnlyRequest, request); + final RaftClientReply reply = isReadOnlyRequest ? + getClient().sendReadOnly(() -> byteString) : + getClient().send(() -> byteString); + LOG.debug("reply {} {}", isReadOnlyRequest, reply); + return reply; + } + + private CompletableFuture<RaftClientReply> sendRequestAsync( + ContainerCommandRequestProto request) throws IOException { + boolean isReadOnlyRequest = isReadOnly(request); + ByteString byteString = + ShadedProtoUtil.asShadedByteString(request.toByteArray()); + LOG.debug("sendCommandAsync {} {}", isReadOnlyRequest, request); + return isReadOnlyRequest ? getClient().sendReadOnlyAsync(() -> byteString) : + getClient().sendAsync(() -> byteString); + } + + @Override + public ContainerCommandResponseProto sendCommand( + ContainerCommandRequestProto request) throws IOException { + final RaftClientReply reply = sendRequest(request); + Preconditions.checkState(reply.isSuccess()); + return ContainerCommandResponseProto.parseFrom( + ShadedProtoUtil.asByteString(reply.getMessage().getContent())); + } + + /** + * Sends a given command to server gets a waitable future back. + * + * @param request Request + * @return Response to the command + * @throws IOException + */ + @Override + public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync( + ContainerCommandRequestProto request) + throws IOException, ExecutionException, InterruptedException { + return sendRequestAsync(request).whenComplete((reply, e) -> + LOG.debug("received reply {} for request: {} exception: {}", request, + reply, e)) + .thenApply(reply -> { + try { + return ContainerCommandResponseProto.parseFrom( + ShadedProtoUtil.asByteString(reply.getMessage().getContent())); + } catch (InvalidProtocolBufferException e) { + throw new CompletionException(e); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java new file mode 100644 index 0000000..8f30a7f --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.client; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.protocolPB + .StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.ContainerData; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos + .ReadContainerResponseProto; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.UUID; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState + .ALLOCATED; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState + .OPEN; + +/** + * This class provides the client-facing APIs of container operations. + */ +public class ContainerOperationClient implements ScmClient { + + private static final Logger LOG = + LoggerFactory.getLogger(ContainerOperationClient.class); + private static long containerSizeB = -1; + private final StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient; + private final XceiverClientManager xceiverClientManager; + + public ContainerOperationClient( + StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient, + XceiverClientManager xceiverClientManager) { + this.storageContainerLocationClient = storageContainerLocationClient; + this.xceiverClientManager = xceiverClientManager; + } + + /** + * Return the capacity of containers. The current assumption is that all + * containers have the same capacity. Therefore one static is sufficient for + * any container. + * @return The capacity of one container in number of bytes. + */ + public static long getContainerSizeB() { + return containerSizeB; + } + + /** + * Set the capacity of container. Should be exactly once on system start. + * @param size Capacity of one container in number of bytes. + */ + public static void setContainerSizeB(long size) { + containerSizeB = size; + } + + /** + * @inheritDoc + */ + @Override + public Pipeline createContainer(String containerId, String owner) + throws IOException { + XceiverClientSpi client = null; + try { + Pipeline pipeline = + storageContainerLocationClient.allocateContainer( + xceiverClientManager.getType(), + xceiverClientManager.getFactor(), containerId, owner); + client = xceiverClientManager.acquireClient(pipeline); + + // Allocated State means that SCM has allocated this pipeline in its + // namespace. The client needs to create the pipeline on the machines + // which was choosen by the SCM. + Preconditions.checkState(pipeline.getLifeCycleState() == ALLOCATED || + pipeline.getLifeCycleState() == OPEN, "Unexpected pipeline state"); + if (pipeline.getLifeCycleState() == ALLOCATED) { + createPipeline(client, pipeline); + } + // TODO : Container Client State needs to be updated. + // TODO : Return ContainerInfo instead of Pipeline + createContainer(containerId, client, pipeline); + return pipeline; + } finally { + if (client != null) { + xceiverClientManager.releaseClient(client); + } + } + } + + /** + * Create a container over pipeline specified by the SCM. + * + * @param containerId - Container ID + * @param client - Client to communicate with Datanodes + * @param pipeline - A pipeline that is already created. + * @throws IOException + */ + public void createContainer(String containerId, XceiverClientSpi client, + Pipeline pipeline) throws IOException { + String traceID = UUID.randomUUID().toString(); + storageContainerLocationClient.notifyObjectStageChange( + ObjectStageChangeRequestProto.Type.container, + containerId, + ObjectStageChangeRequestProto.Op.create, + ObjectStageChangeRequestProto.Stage.begin); + ContainerProtocolCalls.createContainer(client, traceID); + storageContainerLocationClient.notifyObjectStageChange( + ObjectStageChangeRequestProto.Type.container, + containerId, + ObjectStageChangeRequestProto.Op.create, + ObjectStageChangeRequestProto.Stage.complete); + + // Let us log this info after we let SCM know that we have completed the + // creation state. + if (LOG.isDebugEnabled()) { + LOG.debug("Created container " + containerId + + " leader:" + pipeline.getLeader() + + " machines:" + pipeline.getMachines()); + } + } + + /** + * Creates a pipeline over the machines choosen by the SCM. + * + * @param client - Client + * @param pipeline - pipeline to be createdon Datanodes. + * @throws IOException + */ + private void createPipeline(XceiverClientSpi client, Pipeline pipeline) + throws IOException { + + Preconditions.checkNotNull(pipeline.getPipelineName(), "Pipeline " + + "name cannot be null when client create flag is set."); + + // Pipeline creation is a three step process. + // + // 1. Notify SCM that this client is doing a create pipeline on + // datanodes. + // + // 2. Talk to Datanodes to create the pipeline. + // + // 3. update SCM that pipeline creation was successful. + storageContainerLocationClient.notifyObjectStageChange( + ObjectStageChangeRequestProto.Type.pipeline, + pipeline.getPipelineName(), + ObjectStageChangeRequestProto.Op.create, + ObjectStageChangeRequestProto.Stage.begin); + + client.createPipeline(pipeline.getPipelineName(), + pipeline.getMachines()); + + storageContainerLocationClient.notifyObjectStageChange( + ObjectStageChangeRequestProto.Type.pipeline, + pipeline.getPipelineName(), + ObjectStageChangeRequestProto.Op.create, + ObjectStageChangeRequestProto.Stage.complete); + + // TODO : Should we change the state on the client side ?? + // That makes sense, but it is not needed for the client to work. + LOG.debug("Pipeline creation successful. Pipeline: {}", + pipeline.toString()); + } + + /** + * @inheritDoc + */ + @Override + public Pipeline createContainer(HddsProtos.ReplicationType type, + HddsProtos.ReplicationFactor factor, + String containerId, String owner) throws IOException { + XceiverClientSpi client = null; + try { + // allocate container on SCM. + Pipeline pipeline = + storageContainerLocationClient.allocateContainer(type, factor, + containerId, owner); + client = xceiverClientManager.acquireClient(pipeline); + + // Allocated State means that SCM has allocated this pipeline in its + // namespace. The client needs to create the pipeline on the machines + // which was choosen by the SCM. + if (pipeline.getLifeCycleState() == ALLOCATED) { + createPipeline(client, pipeline); + } + + // TODO : Return ContainerInfo instead of Pipeline + // connect to pipeline leader and allocate container on leader datanode. + client = xceiverClientManager.acquireClient(pipeline); + createContainer(containerId, client, pipeline); + return pipeline; + } finally { + if (client != null) { + xceiverClientManager.releaseClient(client); + } + } + } + + /** + * Returns a set of Nodes that meet a query criteria. + * + * @param nodeStatuses - A set of criteria that we want the node to have. + * @param queryScope - Query scope - Cluster or pool. + * @param poolName - if it is pool, a pool name is required. + * @return A set of nodes that meet the requested criteria. + * @throws IOException + */ + @Override + public HddsProtos.NodePool queryNode(EnumSet<HddsProtos.NodeState> + nodeStatuses, HddsProtos.QueryScope queryScope, String poolName) + throws IOException { + return storageContainerLocationClient.queryNode(nodeStatuses, queryScope, + poolName); + } + + /** + * Creates a specified replication pipeline. + */ + @Override + public Pipeline createReplicationPipeline(HddsProtos.ReplicationType type, + HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool) + throws IOException { + return storageContainerLocationClient.createReplicationPipeline(type, + factor, nodePool); + } + + /** + * Delete the container, this will release any resource it uses. + * @param pipeline - Pipeline that represents the container. + * @param force - True to forcibly delete the container. + * @throws IOException + */ + @Override + public void deleteContainer(Pipeline pipeline, boolean force) + throws IOException { + XceiverClientSpi client = null; + try { + client = xceiverClientManager.acquireClient(pipeline); + String traceID = UUID.randomUUID().toString(); + ContainerProtocolCalls.deleteContainer(client, force, traceID); + storageContainerLocationClient + .deleteContainer(pipeline.getContainerName()); + if (LOG.isDebugEnabled()) { + LOG.debug("Deleted container {}, leader: {}, machines: {} ", + pipeline.getContainerName(), + pipeline.getLeader(), + pipeline.getMachines()); + } + } finally { + if (client != null) { + xceiverClientManager.releaseClient(client); + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public List<ContainerInfo> listContainer(String startName, + String prefixName, int count) + throws IOException { + return storageContainerLocationClient.listContainer( + startName, prefixName, count); + } + + /** + * Get meta data from an existing container. + * + * @param pipeline - pipeline that represents the container. + * @return ContainerInfo - a message of protobuf which has basic info + * of a container. + * @throws IOException + */ + @Override + public ContainerData readContainer(Pipeline pipeline) throws IOException { + XceiverClientSpi client = null; + try { + client = xceiverClientManager.acquireClient(pipeline); + String traceID = UUID.randomUUID().toString(); + ReadContainerResponseProto response = + ContainerProtocolCalls.readContainer(client, + pipeline.getContainerName(), traceID); + if (LOG.isDebugEnabled()) { + LOG.debug("Read container {}, leader: {}, machines: {} ", + pipeline.getContainerName(), + pipeline.getLeader(), + pipeline.getMachines()); + } + return response.getContainerData(); + } finally { + if (client != null) { + xceiverClientManager.releaseClient(client); + } + } + } + + /** + * Given an id, return the pipeline associated with the container. + * @param containerId - String Container ID + * @return Pipeline of the existing container, corresponding to the given id. + * @throws IOException + */ + @Override + public Pipeline getContainer(String containerId) throws + IOException { + return storageContainerLocationClient.getContainer(containerId); + } + + /** + * Close a container. + * + * @param pipeline the container to be closed. + * @throws IOException + */ + @Override + public void closeContainer(Pipeline pipeline) throws IOException { + XceiverClientSpi client = null; + try { + LOG.debug("Close container {}", pipeline); + /* + TODO: two orders here, revisit this later: + 1. close on SCM first, then on data node + 2. close on data node first, then on SCM + + with 1: if client failed after closing on SCM, then there is a + container SCM thinks as closed, but is actually open. Then SCM will no + longer allocate block to it, which is fine. But SCM may later try to + replicate this "closed" container, which I'm not sure is safe. + + with 2: if client failed after close on datanode, then there is a + container SCM thinks as open, but is actually closed. Then SCM will still + try to allocate block to it. Which will fail when actually doing the + write. No more data can be written, but at least the correctness and + consistency of existing data will maintain. + + For now, take the #2 way. + */ + // Actually close the container on Datanode + client = xceiverClientManager.acquireClient(pipeline); + String traceID = UUID.randomUUID().toString(); + + String containerId = pipeline.getContainerName(); + + storageContainerLocationClient.notifyObjectStageChange( + ObjectStageChangeRequestProto.Type.container, + containerId, + ObjectStageChangeRequestProto.Op.close, + ObjectStageChangeRequestProto.Stage.begin); + + ContainerProtocolCalls.closeContainer(client, traceID); + // Notify SCM to close the container + storageContainerLocationClient.notifyObjectStageChange( + ObjectStageChangeRequestProto.Type.container, + containerId, + ObjectStageChangeRequestProto.Op.close, + ObjectStageChangeRequestProto.Stage.complete); + } finally { + if (client != null) { + xceiverClientManager.releaseClient(client); + } + } + } + + /** + * Get the the current usage information. + * @param pipeline - Pipeline + * @return the size of the given container. + * @throws IOException + */ + @Override + public long getContainerSize(Pipeline pipeline) throws IOException { + // TODO : Pipeline can be null, handle it correctly. + long size = getContainerSizeB(); + if (size == -1) { + throw new IOException("Container size unknown!"); + } + return size; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java new file mode 100644 index 0000000..bc5f8d6 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.client; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.ParseException; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.concurrent.TimeUnit; + +/** + * Utility methods for Ozone and Container Clients. + * + * The methods to retrieve SCM service endpoints assume there is a single + * SCM service instance. This will change when we switch to replicated service + * instances for redundancy. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public final class HddsClientUtils { + + private static final Logger LOG = LoggerFactory.getLogger( + HddsClientUtils.class); + + private static final int NO_PORT = -1; + + private HddsClientUtils() { + } + + /** + * Date format that used in ozone. Here the format is thread safe to use. + */ + private static final ThreadLocal<DateTimeFormatter> DATE_FORMAT = + ThreadLocal.withInitial(() -> { + DateTimeFormatter format = + DateTimeFormatter.ofPattern(OzoneConsts.OZONE_DATE_FORMAT); + return format.withZone(ZoneId.of(OzoneConsts.OZONE_TIME_ZONE)); + }); + + + /** + * Convert time in millisecond to a human readable format required in ozone. + * @return a human readable string for the input time + */ + public static String formatDateTime(long millis) { + ZonedDateTime dateTime = ZonedDateTime.ofInstant( + Instant.ofEpochSecond(millis), DATE_FORMAT.get().getZone()); + return DATE_FORMAT.get().format(dateTime); + } + + /** + * Convert time in ozone date format to millisecond. + * @return time in milliseconds + */ + public static long formatDateTime(String date) throws ParseException { + Preconditions.checkNotNull(date, "Date string should not be null."); + return ZonedDateTime.parse(date, DATE_FORMAT.get()) + .toInstant().getEpochSecond(); + } + + + + /** + * verifies that bucket name / volume name is a valid DNS name. + * + * @param resName Bucket or volume Name to be validated + * + * @throws IllegalArgumentException + */ + public static void verifyResourceName(String resName) + throws IllegalArgumentException { + + if (resName == null) { + throw new IllegalArgumentException("Bucket or Volume name is null"); + } + + if ((resName.length() < OzoneConsts.OZONE_MIN_BUCKET_NAME_LENGTH) || + (resName.length() > OzoneConsts.OZONE_MAX_BUCKET_NAME_LENGTH)) { + throw new IllegalArgumentException( + "Bucket or Volume length is illegal, " + + "valid length is 3-63 characters"); + } + + if ((resName.charAt(0) == '.') || (resName.charAt(0) == '-')) { + throw new IllegalArgumentException( + "Bucket or Volume name cannot start with a period or dash"); + } + + if ((resName.charAt(resName.length() - 1) == '.') || + (resName.charAt(resName.length() - 1) == '-')) { + throw new IllegalArgumentException( + "Bucket or Volume name cannot end with a period or dash"); + } + + boolean isIPv4 = true; + char prev = (char) 0; + + for (int index = 0; index < resName.length(); index++) { + char currChar = resName.charAt(index); + + if (currChar != '.') { + isIPv4 = ((currChar >= '0') && (currChar <= '9')) && isIPv4; + } + + if (currChar > 'A' && currChar < 'Z') { + throw new IllegalArgumentException( + "Bucket or Volume name does not support uppercase characters"); + } + + if ((currChar != '.') && (currChar != '-')) { + if ((currChar < '0') || (currChar > '9' && currChar < 'a') || + (currChar > 'z')) { + throw new IllegalArgumentException("Bucket or Volume name has an " + + "unsupported character : " + + currChar); + } + } + + if ((prev == '.') && (currChar == '.')) { + throw new IllegalArgumentException("Bucket or Volume name should not " + + "have two contiguous periods"); + } + + if ((prev == '-') && (currChar == '.')) { + throw new IllegalArgumentException( + "Bucket or Volume name should not have period after dash"); + } + + if ((prev == '.') && (currChar == '-')) { + throw new IllegalArgumentException( + "Bucket or Volume name should not have dash after period"); + } + prev = currChar; + } + + if (isIPv4) { + throw new IllegalArgumentException( + "Bucket or Volume name cannot be an IPv4 address or all numeric"); + } + } + + /** + * Returns the cache value to be used for list calls. + * @param conf Configuration object + * @return list cache size + */ + public static int getListCacheSize(Configuration conf) { + return conf.getInt(OzoneConfigKeys.OZONE_CLIENT_LIST_CACHE_SIZE, + OzoneConfigKeys.OZONE_CLIENT_LIST_CACHE_SIZE_DEFAULT); + } + + /** + * @return a default instance of {@link CloseableHttpClient}. + */ + public static CloseableHttpClient newHttpClient() { + return HddsClientUtils.newHttpClient(new Configuration()); + } + + /** + * Returns a {@link CloseableHttpClient} configured by given configuration. + * If conf is null, returns a default instance. + * + * @param conf configuration + * @return a {@link CloseableHttpClient} instance. + */ + public static CloseableHttpClient newHttpClient(Configuration conf) { + long socketTimeout = OzoneConfigKeys + .OZONE_CLIENT_SOCKET_TIMEOUT_DEFAULT; + long connectionTimeout = OzoneConfigKeys + .OZONE_CLIENT_CONNECTION_TIMEOUT_DEFAULT; + if (conf != null) { + socketTimeout = conf.getTimeDuration( + OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT, + OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + connectionTimeout = conf.getTimeDuration( + OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT, + OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + } + + CloseableHttpClient client = HttpClients.custom() + .setDefaultRequestConfig( + RequestConfig.custom() + .setSocketTimeout(Math.toIntExact(socketTimeout)) + .setConnectTimeout(Math.toIntExact(connectionTimeout)) + .build()) + .build(); + return client; + } + + /** + * Returns the maximum no of outstanding async requests to be handled by + * Standalone and Ratis client. + */ + public static int getMaxOutstandingRequests(Configuration config) { + return config + .getInt(ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS, + ScmConfigKeys + .SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS_DEFAULT); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java new file mode 100644 index 0000000..73ad78c --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.client; + +/** + * Client facing classes for the container operations. + */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/package-info.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/package-info.java new file mode 100644 index 0000000..9390bc1 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm; + +/** + * Classes for different type of container service client. + */ \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org