http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java new file mode 100644 index 0000000..9b8eaa9 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.storage; + +import com.google.protobuf.ByteString; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos + .ReadChunkResponseProto; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; + +/** + * An {@link InputStream} used by the REST service in combination with the + * SCMClient to read the value of a key from a sequence + * of container chunks. All bytes of the key value are stored in container + * chunks. Each chunk may contain multiple underlying {@link ByteBuffer} + * instances. This class encapsulates all state management for iterating + * through the sequence of chunks and the sequence of buffers within each chunk. + */ +public class ChunkInputStream extends InputStream implements Seekable { + + private static final int EOF = -1; + + private final String key; + private final String traceID; + private XceiverClientManager xceiverClientManager; + private XceiverClientSpi xceiverClient; + private List<ChunkInfo> chunks; + private int chunkIndex; + private long[] chunkOffset; + private List<ByteBuffer> buffers; + private int bufferIndex; + + /** + * Creates a new ChunkInputStream. + * + * @param key chunk key + * @param xceiverClientManager client manager that controls client + * @param xceiverClient client to perform container calls + * @param chunks list of chunks to read + * @param traceID container protocol call traceID + */ + public ChunkInputStream(String key, XceiverClientManager xceiverClientManager, + XceiverClientSpi xceiverClient, List<ChunkInfo> chunks, String traceID) { + this.key = key; + this.traceID = traceID; + this.xceiverClientManager = xceiverClientManager; + this.xceiverClient = xceiverClient; + this.chunks = chunks; + this.chunkIndex = -1; + // chunkOffset[i] stores offset at which chunk i stores data in + // ChunkInputStream + this.chunkOffset = new long[this.chunks.size()]; + initializeChunkOffset(); + this.buffers = null; + this.bufferIndex = 0; + } + + private void initializeChunkOffset() { + int tempOffset = 0; + for (int i = 0; i < chunks.size(); i++) { + chunkOffset[i] = tempOffset; + tempOffset += chunks.get(i).getLen(); + } + } + + @Override + public synchronized int read() + throws IOException { + checkOpen(); + int available = prepareRead(1); + return available == EOF ? EOF : + Byte.toUnsignedInt(buffers.get(bufferIndex).get()); + } + + @Override + public synchronized int read(byte[] b, int off, int len) throws IOException { + // According to the JavaDocs for InputStream, it is recommended that + // subclasses provide an override of bulk read if possible for performance + // reasons. In addition to performance, we need to do it for correctness + // reasons. The Ozone REST service uses PipedInputStream and + // PipedOutputStream to relay HTTP response data between a Jersey thread and + // a Netty thread. It turns out that PipedInputStream/PipedOutputStream + // have a subtle dependency (bug?) on the wrapped stream providing separate + // implementations of single-byte read and bulk read. Without this, get key + // responses might close the connection before writing all of the bytes + // advertised in the Content-Length. + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + if (len == 0) { + return 0; + } + checkOpen(); + int available = prepareRead(len); + if (available == EOF) { + return EOF; + } + buffers.get(bufferIndex).get(b, off, available); + return available; + } + + @Override + public synchronized void close() { + if (xceiverClientManager != null && xceiverClient != null) { + xceiverClientManager.releaseClient(xceiverClient); + xceiverClientManager = null; + xceiverClient = null; + } + } + + /** + * Checks if the stream is open. If not, throws an exception. + * + * @throws IOException if stream is closed + */ + private synchronized void checkOpen() throws IOException { + if (xceiverClient == null) { + throw new IOException("ChunkInputStream has been closed."); + } + } + + /** + * Prepares to read by advancing through chunks and buffers as needed until it + * finds data to return or encounters EOF. + * + * @param len desired length of data to read + * @return length of data available to read, possibly less than desired length + */ + private synchronized int prepareRead(int len) throws IOException { + for (;;) { + if (chunks == null || chunks.isEmpty()) { + // This must be an empty key. + return EOF; + } else if (buffers == null) { + // The first read triggers fetching the first chunk. + readChunkFromContainer(); + } else if (!buffers.isEmpty() && + buffers.get(bufferIndex).hasRemaining()) { + // Data is available from the current buffer. + ByteBuffer bb = buffers.get(bufferIndex); + return len > bb.remaining() ? bb.remaining() : len; + } else if (!buffers.isEmpty() && + !buffers.get(bufferIndex).hasRemaining() && + bufferIndex < buffers.size() - 1) { + // There are additional buffers available. + ++bufferIndex; + } else if (chunkIndex < chunks.size() - 1) { + // There are additional chunks available. + readChunkFromContainer(); + } else { + // All available input has been consumed. + return EOF; + } + } + } + + /** + * Attempts to read the chunk at the specified offset in the chunk list. If + * successful, then the data of the read chunk is saved so that its bytes can + * be returned from subsequent read calls. + * + * @throws IOException if there is an I/O error while performing the call + */ + private synchronized void readChunkFromContainer() throws IOException { + // On every chunk read chunkIndex should be increased so as to read the + // next chunk + chunkIndex += 1; + final ReadChunkResponseProto readChunkResponse; + try { + readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient, + chunks.get(chunkIndex), key, traceID); + } catch (IOException e) { + throw new IOException("Unexpected OzoneException: " + e.toString(), e); + } + ByteString byteString = readChunkResponse.getData(); + buffers = byteString.asReadOnlyByteBufferList(); + bufferIndex = 0; + } + + @Override + public synchronized void seek(long pos) throws IOException { + if (pos < 0 || (chunks.size() == 0 && pos > 0) + || pos >= chunkOffset[chunks.size() - 1] + chunks.get(chunks.size() - 1) + .getLen()) { + throw new EOFException( + "EOF encountered pos: " + pos + " container key: " + key); + } + if (chunkIndex == -1) { + chunkIndex = Arrays.binarySearch(chunkOffset, pos); + } else if (pos < chunkOffset[chunkIndex]) { + chunkIndex = Arrays.binarySearch(chunkOffset, 0, chunkIndex, pos); + } else if (pos >= chunkOffset[chunkIndex] + chunks.get(chunkIndex) + .getLen()) { + chunkIndex = + Arrays.binarySearch(chunkOffset, chunkIndex + 1, chunks.size(), pos); + } + if (chunkIndex < 0) { + // Binary search returns -insertionPoint - 1 if element is not present + // in the array. insertionPoint is the point at which element would be + // inserted in the sorted array. We need to adjust the chunkIndex + // accordingly so that chunkIndex = insertionPoint - 1 + chunkIndex = -chunkIndex -2; + } + // adjust chunkIndex so that readChunkFromContainer reads the correct chunk + chunkIndex -= 1; + readChunkFromContainer(); + adjustBufferIndex(pos); + } + + private void adjustBufferIndex(long pos) { + long tempOffest = chunkOffset[chunkIndex]; + for (int i = 0; i < buffers.size(); i++) { + if (pos - tempOffest >= buffers.get(i).capacity()) { + tempOffest += buffers.get(i).capacity(); + } else { + bufferIndex = i; + break; + } + } + buffers.get(bufferIndex).position((int) (pos - tempOffest)); + } + + @Override + public synchronized long getPos() throws IOException { + return chunkIndex == -1 ? 0 : + chunkOffset[chunkIndex] + buffers.get(bufferIndex).position(); + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return false; + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java new file mode 100644 index 0000000..b65df9f --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.storage; + +import com.google.protobuf.ByteString; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.KeyData; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.KeyValue; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.UUID; + +import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putKey; +import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls + .writeChunk; + +/** + * An {@link OutputStream} used by the REST service in combination with the + * SCMClient to write the value of a key to a sequence + * of container chunks. Writes are buffered locally and periodically written to + * the container as a new chunk. In order to preserve the semantics that + * replacement of a pre-existing key is atomic, each instance of the stream has + * an internal unique identifier. This unique identifier and a monotonically + * increasing chunk index form a composite key that is used as the chunk name. + * After all data is written, a putKey call creates or updates the corresponding + * container key, and this call includes the full list of chunks that make up + * the key data. The list of chunks is updated all at once. Therefore, a + * concurrent reader never can see an intermediate state in which different + * chunks of data from different versions of the key data are interleaved. + * This class encapsulates all state management for buffering and writing + * through to the container. + */ +public class ChunkOutputStream extends OutputStream { + + private final String containerKey; + private final String key; + private final String traceID; + private final KeyData.Builder containerKeyData; + private XceiverClientManager xceiverClientManager; + private XceiverClientSpi xceiverClient; + private ByteBuffer buffer; + private final String streamId; + private int chunkIndex; + private int chunkSize; + + /** + * Creates a new ChunkOutputStream. + * + * @param containerKey container key + * @param key chunk key + * @param xceiverClientManager client manager that controls client + * @param xceiverClient client to perform container calls + * @param traceID container protocol call args + * @param chunkSize chunk size + */ + public ChunkOutputStream(String containerKey, String key, + XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, + String traceID, int chunkSize) { + this.containerKey = containerKey; + this.key = key; + this.traceID = traceID; + this.chunkSize = chunkSize; + KeyValue keyValue = KeyValue.newBuilder() + .setKey("TYPE").setValue("KEY").build(); + this.containerKeyData = KeyData.newBuilder() + .setContainerName(xceiverClient.getPipeline().getContainerName()) + .setName(containerKey) + .addMetadata(keyValue); + this.xceiverClientManager = xceiverClientManager; + this.xceiverClient = xceiverClient; + this.buffer = ByteBuffer.allocate(chunkSize); + this.streamId = UUID.randomUUID().toString(); + this.chunkIndex = 0; + } + + @Override + public synchronized void write(int b) throws IOException { + checkOpen(); + int rollbackPosition = buffer.position(); + int rollbackLimit = buffer.limit(); + buffer.put((byte)b); + if (buffer.position() == chunkSize) { + flushBufferToChunk(rollbackPosition, rollbackLimit); + } + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + if (b == null) { + throw new NullPointerException(); + } + if ((off < 0) || (off > b.length) || (len < 0) || + ((off + len) > b.length) || ((off + len) < 0)) { + throw new IndexOutOfBoundsException(); + } + if (len == 0) { + return; + } + checkOpen(); + while (len > 0) { + int writeLen = Math.min(chunkSize - buffer.position(), len); + int rollbackPosition = buffer.position(); + int rollbackLimit = buffer.limit(); + buffer.put(b, off, writeLen); + if (buffer.position() == chunkSize) { + flushBufferToChunk(rollbackPosition, rollbackLimit); + } + off += writeLen; + len -= writeLen; + } + } + + @Override + public synchronized void flush() throws IOException { + checkOpen(); + if (buffer.position() > 0) { + int rollbackPosition = buffer.position(); + int rollbackLimit = buffer.limit(); + flushBufferToChunk(rollbackPosition, rollbackLimit); + } + } + + @Override + public synchronized void close() throws IOException { + if (xceiverClientManager != null && xceiverClient != null && + buffer != null) { + try { + if (buffer.position() > 0) { + writeChunkToContainer(); + } + putKey(xceiverClient, containerKeyData.build(), traceID); + } catch (IOException e) { + throw new IOException( + "Unexpected Storage Container Exception: " + e.toString(), e); + } finally { + xceiverClientManager.releaseClient(xceiverClient); + xceiverClientManager = null; + xceiverClient = null; + buffer = null; + } + } + + } + + /** + * Checks if the stream is open. If not, throws an exception. + * + * @throws IOException if stream is closed + */ + private synchronized void checkOpen() throws IOException { + if (xceiverClient == null) { + throw new IOException("ChunkOutputStream has been closed."); + } + } + + /** + * Attempts to flush buffered writes by writing a new chunk to the container. + * If successful, then clears the buffer to prepare to receive writes for a + * new chunk. + * + * @param rollbackPosition position to restore in buffer if write fails + * @param rollbackLimit limit to restore in buffer if write fails + * @throws IOException if there is an I/O error while performing the call + */ + private synchronized void flushBufferToChunk(int rollbackPosition, + int rollbackLimit) throws IOException { + boolean success = false; + try { + writeChunkToContainer(); + success = true; + } finally { + if (success) { + buffer.clear(); + } else { + buffer.position(rollbackPosition); + buffer.limit(rollbackLimit); + } + } + } + + /** + * Writes buffered data as a new chunk to the container and saves chunk + * information to be used later in putKey call. + * + * @throws IOException if there is an I/O error while performing the call + */ + private synchronized void writeChunkToContainer() throws IOException { + buffer.flip(); + ByteString data = ByteString.copyFrom(buffer); + ChunkInfo chunk = ChunkInfo + .newBuilder() + .setChunkName( + DigestUtils.md5Hex(key) + "_stream_" + + streamId + "_chunk_" + ++chunkIndex) + .setOffset(0) + .setLen(data.size()) + .build(); + try { + writeChunk(xceiverClient, chunk, key, data, traceID); + } catch (IOException e) { + throw new IOException( + "Unexpected Storage Container Exception: " + e.toString(), e); + } + containerKeyData.addChunks(chunk); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/package-info.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/package-info.java new file mode 100644 index 0000000..6e7ce94 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.storage; + +/** + * Low level IO streams to upload/download chunks from container service. + */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml b/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml new file mode 100644 index 0000000..3571a89 --- /dev/null +++ b/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml @@ -0,0 +1,21 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<FindBugsFilter> + <Match> + <Package name="org.apache.hadoop.hdds.protocol.proto"/> + </Match> +</FindBugsFilter> http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/pom.xml b/hadoop-hdds/common/pom.xml new file mode 100644 index 0000000..6b2a156 --- /dev/null +++ b/hadoop-hdds/common/pom.xml @@ -0,0 +1,128 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 +http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdds</artifactId> + <version>3.2.0-SNAPSHOT</version> + </parent> + <artifactId>hadoop-hdds-common</artifactId> + <version>3.2.0-SNAPSHOT</version> + <description>Apache Hadoop Distributed Data Store Common</description> + <name>Apache HDDS Common</name> + <packaging>jar</packaging> + + <properties> + <hadoop.component>hdds</hadoop.component> + <is.hadoop.component>true</is.hadoop.component> + </properties> + + <dependencies> + <dependency> + <groupId>org.fusesource.leveldbjni</groupId> + <artifactId>leveldbjni-all</artifactId> + </dependency> + + <dependency> + <artifactId>ratis-server</artifactId> + <groupId>org.apache.ratis</groupId> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <artifactId>ratis-netty</artifactId> + <groupId>org.apache.ratis</groupId> + </dependency> + <dependency> + <artifactId>ratis-grpc</artifactId> + <groupId>org.apache.ratis</groupId> + </dependency> + + <dependency> + <groupId>org.rocksdb</groupId> + <artifactId>rocksdbjni</artifactId> + <version>5.8.0</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-maven-plugins</artifactId> + <executions> + <execution> + <id>compile-protoc</id> + <goals> + <goal>protoc</goal> + </goals> + <configuration> + <protocVersion>${protobuf.version}</protocVersion> + <protocCommand>${protoc.path}</protocCommand> + <imports> + <param> + ${basedir}/../../hadoop-common-project/hadoop-common/src/main/proto + </param> + <param> + ${basedir}/../../hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ + </param> + <param> + ${basedir}/../../hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ + </param> + <param>${basedir}/src/main/proto</param> + </imports> + <source> + <directory>${basedir}/src/main/proto</directory> + <includes> + <include>StorageContainerLocationProtocol.proto</include> + <include>DatanodeContainerProtocol.proto</include> + <include>hdds.proto</include> + <include>ScmBlockLocationProtocol.proto</include> + </includes> + </source> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <configuration> + <excludeFilterFile>${basedir}/dev-support/findbugsExcludeFile.xml</excludeFilterFile> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java new file mode 100644 index 0000000..665618c --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java @@ -0,0 +1,6 @@ +package org.apache.hadoop.hdds; + +public class HddsConfigKeys { + private HddsConfigKeys() { + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java new file mode 100644 index 0000000..f00f503 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds; + +import com.google.common.base.Optional; +import com.google.common.base.Strings; +import com.google.common.net.HostAndPort; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.HashSet; + +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED_DEFAULT; + +/** + * HDDS specific stateless utility functions. + */ +public class HddsUtils { + + + private static final Logger LOG = LoggerFactory.getLogger(HddsUtils.class); + + /** + * The service ID of the solitary Ozone SCM service. + */ + public static final String OZONE_SCM_SERVICE_ID = "OzoneScmService"; + public static final String OZONE_SCM_SERVICE_INSTANCE_ID = + "OzoneScmServiceInstance"; + + private static final int NO_PORT = -1; + + private HddsUtils() { + } + + /** + * Retrieve the socket address that should be used by clients to connect + * to the SCM. + * + * @param conf + * @return Target InetSocketAddress for the SCM client endpoint. + */ + public static InetSocketAddress getScmAddressForClients(Configuration conf) { + final Optional<String> host = getHostNameFromConfigKeys(conf, + ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY); + + if (!host.isPresent()) { + throw new IllegalArgumentException( + ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY + " must be defined. See" + + " https://wiki.apache.org/hadoop/Ozone#Configuration for " + + "details" + + " on configuring Ozone."); + } + + final Optional<Integer> port = getPortNumberFromConfigKeys(conf, + ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY); + + return NetUtils.createSocketAddr(host.get() + ":" + port + .or(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT)); + } + + /** + * Retrieve the socket address that should be used by clients to connect + * to the SCM for block service. If + * {@link ScmConfigKeys#OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY} is not defined + * then {@link ScmConfigKeys#OZONE_SCM_CLIENT_ADDRESS_KEY} is used. + * + * @param conf + * @return Target InetSocketAddress for the SCM block client endpoint. + * @throws IllegalArgumentException if configuration is not defined. + */ + public static InetSocketAddress getScmAddressForBlockClients( + Configuration conf) { + Optional<String> host = getHostNameFromConfigKeys(conf, + ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY); + + if (!host.isPresent()) { + host = getHostNameFromConfigKeys(conf, + ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY); + if (!host.isPresent()) { + throw new IllegalArgumentException( + ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY + + " must be defined. See" + + " https://wiki.apache.org/hadoop/Ozone#Configuration" + + " for details on configuring Ozone."); + } + } + + final Optional<Integer> port = getPortNumberFromConfigKeys(conf, + ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY); + + return NetUtils.createSocketAddr(host.get() + ":" + port + .or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT)); + } + + /** + * Retrieve the hostname, trying the supplied config keys in order. + * Each config value may be absent, or if present in the format + * host:port (the :port part is optional). + * + * @param conf - Conf + * @param keys a list of configuration key names. + * + * @return first hostname component found from the given keys, or absent. + * @throws IllegalArgumentException if any values are not in the 'host' + * or host:port format. + */ + public static Optional<String> getHostNameFromConfigKeys(Configuration conf, + String... keys) { + for (final String key : keys) { + final String value = conf.getTrimmed(key); + final Optional<String> hostName = getHostName(value); + if (hostName.isPresent()) { + return hostName; + } + } + return Optional.absent(); + } + + /** + * Gets the hostname or Indicates that it is absent. + * @param value host or host:port + * @return hostname + */ + public static Optional<String> getHostName(String value) { + if ((value == null) || value.isEmpty()) { + return Optional.absent(); + } + return Optional.of(HostAndPort.fromString(value).getHostText()); + } + + /** + * Gets the port if there is one, throws otherwise. + * @param value String in host:port format. + * @return Port + */ + public static Optional<Integer> getHostPort(String value) { + if ((value == null) || value.isEmpty()) { + return Optional.absent(); + } + int port = HostAndPort.fromString(value).getPortOrDefault(NO_PORT); + if (port == NO_PORT) { + return Optional.absent(); + } else { + return Optional.of(port); + } + } + + /** + * Retrieve the port number, trying the supplied config keys in order. + * Each config value may be absent, or if present in the format + * host:port (the :port part is optional). + * + * @param conf Conf + * @param keys a list of configuration key names. + * + * @return first port number component found from the given keys, or absent. + * @throws IllegalArgumentException if any values are not in the 'host' + * or host:port format. + */ + public static Optional<Integer> getPortNumberFromConfigKeys( + Configuration conf, String... keys) { + for (final String key : keys) { + final String value = conf.getTrimmed(key); + final Optional<Integer> hostPort = getHostPort(value); + if (hostPort.isPresent()) { + return hostPort; + } + } + return Optional.absent(); + } + + /** + * Retrieve the socket addresses of all storage container managers. + * + * @param conf + * @return A collection of SCM addresses + * @throws IllegalArgumentException If the configuration is invalid + */ + public static Collection<InetSocketAddress> getSCMAddresses( + Configuration conf) throws IllegalArgumentException { + Collection<InetSocketAddress> addresses = + new HashSet<InetSocketAddress>(); + Collection<String> names = + conf.getTrimmedStringCollection(ScmConfigKeys.OZONE_SCM_NAMES); + if (names == null || names.isEmpty()) { + throw new IllegalArgumentException(ScmConfigKeys.OZONE_SCM_NAMES + + " need to be a set of valid DNS names or IP addresses." + + " Null or empty address list found."); + } + + final com.google.common.base.Optional<Integer> + defaultPort = com.google.common.base.Optional.of(ScmConfigKeys + .OZONE_SCM_DEFAULT_PORT); + for (String address : names) { + com.google.common.base.Optional<String> hostname = + getHostName(address); + if (!hostname.isPresent()) { + throw new IllegalArgumentException("Invalid hostname for SCM: " + + hostname); + } + com.google.common.base.Optional<Integer> port = + getHostPort(address); + InetSocketAddress addr = NetUtils.createSocketAddr(hostname.get(), + port.or(defaultPort.get())); + addresses.add(addr); + } + return addresses; + } + + public static boolean isHddsEnabled(Configuration conf) { + String securityEnabled = + conf.get(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "simple"); + boolean securityAuthorizationEnabled = conf.getBoolean( + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false); + + if (securityEnabled.equals("kerberos") || securityAuthorizationEnabled) { + LOG.error("Ozone is not supported in a security enabled cluster. "); + return false; + } else { + return conf.getBoolean(OZONE_ENABLED, OZONE_ENABLED_DEFAULT); + } + } + + + /** + * Get the path for datanode id file. + * + * @param conf - Configuration + * @return the path of datanode id as string + */ + public static String getDatanodeIdFilePath(Configuration conf) { + String dataNodeIDPath = conf.get(ScmConfigKeys.OZONE_SCM_DATANODE_ID); + if (dataNodeIDPath == null) { + String metaPath = conf.get(OzoneConfigKeys.OZONE_METADATA_DIRS); + if (Strings.isNullOrEmpty(metaPath)) { + // this means meta data is not found, in theory should not happen at + // this point because should've failed earlier. + throw new IllegalArgumentException("Unable to locate meta data" + + "directory when getting datanode id path"); + } + dataNodeIDPath = Paths.get(metaPath, + ScmConfigKeys.OZONE_SCM_DATANODE_ID_PATH_DEFAULT).toString(); + } + return dataNodeIDPath; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/OzoneQuota.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/OzoneQuota.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/OzoneQuota.java new file mode 100644 index 0000000..59708a9 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/OzoneQuota.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.client; + +import org.apache.hadoop.ozone.OzoneConsts; + + +/** + * represents an OzoneQuota Object that can be applied to + * a storage volume. + */ +public class OzoneQuota { + + public static final String OZONE_QUOTA_BYTES = "BYTES"; + public static final String OZONE_QUOTA_MB = "MB"; + public static final String OZONE_QUOTA_GB = "GB"; + public static final String OZONE_QUOTA_TB = "TB"; + + private Units unit; + private long size; + + /** Quota Units.*/ + public enum Units {UNDEFINED, BYTES, KB, MB, GB, TB} + + /** + * Returns size. + * + * @return long + */ + public long getSize() { + return size; + } + + /** + * Returns Units. + * + * @return Unit in MB, GB or TB + */ + public Units getUnit() { + return unit; + } + + /** + * Constructs a default Quota object. + */ + public OzoneQuota() { + this.size = 0; + this.unit = Units.UNDEFINED; + } + + /** + * Constructor for Ozone Quota. + * + * @param size Long Size + * @param unit MB, GB or TB + */ + public OzoneQuota(long size, Units unit) { + this.size = size; + this.unit = unit; + } + + /** + * Formats a quota as a string. + * + * @param quota the quota to format + * @return string representation of quota + */ + public static String formatQuota(OzoneQuota quota) { + return String.valueOf(quota.size) + quota.unit; + } + + /** + * Parses a user provided string and returns the + * Quota Object. + * + * @param quotaString Quota String + * + * @return OzoneQuota object + * + * @throws IllegalArgumentException + */ + public static OzoneQuota parseQuota(String quotaString) + throws IllegalArgumentException { + + if ((quotaString == null) || (quotaString.isEmpty())) { + throw new IllegalArgumentException( + "Quota string cannot be null or empty."); + } + + String uppercase = quotaString.toUpperCase().replaceAll("\\s+", ""); + String size = ""; + int nSize; + Units currUnit = Units.MB; + Boolean found = false; + if (uppercase.endsWith(OZONE_QUOTA_MB)) { + size = uppercase + .substring(0, uppercase.length() - OZONE_QUOTA_MB.length()); + currUnit = Units.MB; + found = true; + } + + if (uppercase.endsWith(OZONE_QUOTA_GB)) { + size = uppercase + .substring(0, uppercase.length() - OZONE_QUOTA_GB.length()); + currUnit = Units.GB; + found = true; + } + + if (uppercase.endsWith(OZONE_QUOTA_TB)) { + size = uppercase + .substring(0, uppercase.length() - OZONE_QUOTA_TB.length()); + currUnit = Units.TB; + found = true; + } + + if (uppercase.endsWith(OZONE_QUOTA_BYTES)) { + size = uppercase + .substring(0, uppercase.length() - OZONE_QUOTA_BYTES.length()); + currUnit = Units.BYTES; + found = true; + } + + if (!found) { + throw new IllegalArgumentException( + "Quota unit not recognized. Supported values are BYTES, MB, GB and " + + "TB."); + } + + nSize = Integer.parseInt(size); + if (nSize < 0) { + throw new IllegalArgumentException("Quota cannot be negative."); + } + + return new OzoneQuota(nSize, currUnit); + } + + + /** + * Returns size in Bytes or -1 if there is no Quota. + */ + public long sizeInBytes() { + switch (this.unit) { + case BYTES: + return this.getSize(); + case MB: + return this.getSize() * OzoneConsts.MB; + case GB: + return this.getSize() * OzoneConsts.GB; + case TB: + return this.getSize() * OzoneConsts.TB; + case UNDEFINED: + default: + return -1; + } + } + + /** + * Returns OzoneQuota corresponding to size in bytes. + * + * @param sizeInBytes size in bytes to be converted + * + * @return OzoneQuota object + */ + public static OzoneQuota getOzoneQuota(long sizeInBytes) { + long size; + Units unit; + if (sizeInBytes % OzoneConsts.TB == 0) { + size = sizeInBytes / OzoneConsts.TB; + unit = Units.TB; + } else if (sizeInBytes % OzoneConsts.GB == 0) { + size = sizeInBytes / OzoneConsts.GB; + unit = Units.GB; + } else if (sizeInBytes % OzoneConsts.MB == 0) { + size = sizeInBytes / OzoneConsts.MB; + unit = Units.MB; + } else { + size = sizeInBytes; + unit = Units.BYTES; + } + return new OzoneQuota((int)size, unit); + } + + @Override + public String toString() { + return size + " " + unit; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationFactor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationFactor.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationFactor.java new file mode 100644 index 0000000..0215964 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationFactor.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.client; + +/** + * The replication factor to be used while writing key into ozone. + */ +public enum ReplicationFactor { + ONE(1), + THREE(3); + + /** + * Integer representation of replication. + */ + private int value; + + /** + * Initializes ReplicationFactor with value. + * @param value replication value + */ + ReplicationFactor(int value) { + this.value = value; + } + + /** + * Returns enum value corresponding to the int value. + * @param value replication value + * @return ReplicationFactor + */ + public static ReplicationFactor valueOf(int value) { + if(value == 1) { + return ONE; + } + if (value == 3) { + return THREE; + } + throw new IllegalArgumentException("Unsupported value: " + value); + } + + /** + * Returns integer representation of ReplicationFactor. + * @return replication value + */ + public int getValue() { + return value; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationType.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationType.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationType.java new file mode 100644 index 0000000..259a1a2 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationType.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.client; + +/** + * The replication type to be used while writing key into ozone. + */ +public enum ReplicationType { + RATIS, + STAND_ALONE, + CHAINED +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/package-info.java new file mode 100644 index 0000000..e81f134 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.client; + +/** + * Base property types for HDDS containers and replications. + */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/conf/OzoneConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/conf/OzoneConfiguration.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/conf/OzoneConfiguration.java new file mode 100644 index 0000000..f07718c --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/conf/OzoneConfiguration.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.conf; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; + +/** + * Configuration for ozone. + */ +@InterfaceAudience.Private +public class OzoneConfiguration extends Configuration { + static { + activate(); + } + + public OzoneConfiguration() { + OzoneConfiguration.activate(); + } + + public OzoneConfiguration(Configuration conf) { + super(conf); + } + + public List<Property> readPropertyFromXml(URL url) throws JAXBException { + JAXBContext context = JAXBContext.newInstance(XMLConfiguration.class); + Unmarshaller um = context.createUnmarshaller(); + + XMLConfiguration config = (XMLConfiguration) um.unmarshal(url); + return config.getProperties(); + } + + /** + * Class to marshall/un-marshall configuration from xml files. + */ + @XmlAccessorType(XmlAccessType.FIELD) + @XmlRootElement(name = "configuration") + public static class XMLConfiguration { + + @XmlElement(name = "property", type = Property.class) + private List<Property> properties = new ArrayList<>(); + + public XMLConfiguration() { + } + + public XMLConfiguration(List<Property> properties) { + this.properties = properties; + } + + public List<Property> getProperties() { + return properties; + } + + public void setProperties(List<Property> properties) { + this.properties = properties; + } + } + + /** + * Class to marshall/un-marshall configuration properties from xml files. + */ + @XmlAccessorType(XmlAccessType.FIELD) + @XmlRootElement(name = "property") + public static class Property implements Comparable<Property> { + + private String name; + private String value; + private String tag; + private String description; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + public String getTag() { + return tag; + } + + public void setTag(String tag) { + this.tag = tag; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + @Override + public int compareTo(Property o) { + if (this == o) { + return 0; + } + return this.getName().compareTo(o.getName()); + } + + @Override + public String toString() { + return this.getName() + " " + this.getValue() + this.getTag(); + } + + @Override + public int hashCode(){ + return this.getName().hashCode(); + } + + @Override + public boolean equals(Object obj) { + return (obj instanceof Property) && (((Property) obj).getName()) + .equals(this.getName()); + } + } + + public static void activate(){ + // adds the default resources + Configuration.addDefaultResource("hdfs-default.xml"); + Configuration.addDefaultResource("hdfs-site.xml"); + Configuration.addDefaultResource("ozone-default.xml"); + Configuration.addDefaultResource("ozone-site.xml"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/conf/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/conf/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/conf/package-info.java new file mode 100644 index 0000000..948057e --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/conf/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.conf; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/package-info.java new file mode 100644 index 0000000..f8894e6 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds; + +/** + * Generic HDDS specific configurator and helper classes. + */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java new file mode 100644 index 0000000..1463591 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java @@ -0,0 +1,422 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.protocol; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; + +import java.util.UUID; + +/** + * DatanodeDetails class contains details about DataNode like: + * - UUID of the DataNode. + * - IP and Hostname details. + * - Port details to which the DataNode will be listening. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class DatanodeDetails implements Comparable<DatanodeDetails> { + + /** + * DataNode's unique identifier in the cluster. + */ + private final UUID uuid; + + private String ipAddress; + private String hostName; + private Integer infoPort; + private Integer infoSecurePort; + private Integer containerPort; + private Integer ratisPort; + private Integer ozoneRestPort; + + + /** + * Constructs DatanodeDetails instance. DatanodeDetails.Builder is used + * for instantiating DatanodeDetails. + * @param uuid DataNode's UUID + * @param ipAddress IP Address of this DataNode + * @param hostName DataNode's hostname + * @param infoPort HTTP Port + * @param infoSecurePort HTTPS Port + * @param containerPort Container Port + * @param ratisPort Ratis Port + * @param ozoneRestPort Rest Port + */ + private DatanodeDetails( + String uuid, String ipAddress, String hostName, Integer infoPort, + Integer infoSecurePort, Integer containerPort, Integer ratisPort, + Integer ozoneRestPort) { + this.uuid = UUID.fromString(uuid); + this.ipAddress = ipAddress; + this.hostName = hostName; + this.infoPort = infoPort; + this.infoSecurePort = infoSecurePort; + this.containerPort = containerPort; + this.ratisPort = ratisPort; + this.ozoneRestPort = ozoneRestPort; + } + + /** + * Returns the DataNode UUID. + * + * @return UUID of DataNode + */ + public UUID getUuid() { + return uuid; + } + + /** + * Returns the string representation of DataNode UUID. + * + * @return UUID of DataNode + */ + public String getUuidString() { + return uuid.toString(); + } + + /** + * Sets the IP address of Datanode. + * + * @param ip IP Address + */ + public void setIpAddress(String ip) { + this.ipAddress = ip; + } + + /** + * Returns IP address of DataNode. + * + * @return IP address + */ + public String getIpAddress() { + return ipAddress; + } + + /** + * Sets the Datanode hostname. + * + * @param host hostname + */ + public void setHostName(String host) { + this.hostName = host; + } + + /** + * Returns Hostname of DataNode. + * + * @return Hostname + */ + public String getHostName() { + return hostName; + } + + /** + * Sets the InfoPort. + * @param port InfoPort + */ + public void setInfoPort(int port) { + infoPort = port; + } + + /** + * Returns DataNodes Info Port. + * + * @return InfoPort + */ + public int getInfoPort() { + return infoPort; + } + + /** + * Sets the InfoSecurePort. + * + * @param port InfoSecurePort + */ + public void setInfoSecurePort(int port) { + infoSecurePort = port; + } + + /** + * Returns DataNodes Secure Info Port. + * + * @return InfoSecurePort + */ + public int getInfoSecurePort() { + return infoSecurePort; + } + + /** + * Sets the Container Port. + * @param port ContainerPort + */ + public void setContainerPort(int port) { + containerPort = port; + } + + /** + * Returns standalone container Port. + * + * @return Container Port + */ + public int getContainerPort() { + return containerPort; + } + + /** + * Sets Ratis Port. + * @param port RatisPort + */ + public void setRatisPort(int port) { + ratisPort = port; + } + + + /** + * Returns Ratis Port. + * @return Ratis Port + */ + public int getRatisPort() { + return ratisPort; + } + + + /** + * Sets OzoneRestPort. + * @param port OzoneRestPort + */ + public void setOzoneRestPort(int port) { + ozoneRestPort = port; + } + + /** + * Returns Ozone Rest Port. + * @return OzoneRestPort + */ + public int getOzoneRestPort() { + return ozoneRestPort; + } + + /** + * Returns a DatanodeDetails from the protocol buffers. + * + * @param datanodeDetailsProto - protoBuf Message + * @return DatanodeDetails + */ + public static DatanodeDetails getFromProtoBuf( + HddsProtos.DatanodeDetailsProto datanodeDetailsProto) { + DatanodeDetails.Builder builder = newBuilder(); + builder.setUuid(datanodeDetailsProto.getUuid()); + if (datanodeDetailsProto.hasIpAddress()) { + builder.setIpAddress(datanodeDetailsProto.getIpAddress()); + } + if (datanodeDetailsProto.hasHostName()) { + builder.setHostName(datanodeDetailsProto.getHostName()); + } + if (datanodeDetailsProto.hasInfoPort()) { + builder.setInfoPort(datanodeDetailsProto.getInfoPort()); + } + if (datanodeDetailsProto.hasInfoSecurePort()) { + builder.setInfoSecurePort(datanodeDetailsProto.getInfoSecurePort()); + } + if (datanodeDetailsProto.hasContainerPort()) { + builder.setContainerPort(datanodeDetailsProto.getContainerPort()); + } + if (datanodeDetailsProto.hasRatisPort()) { + builder.setRatisPort(datanodeDetailsProto.getRatisPort()); + } + if (datanodeDetailsProto.hasOzoneRestPort()) { + builder.setOzoneRestPort(datanodeDetailsProto.getOzoneRestPort()); + } + return builder.build(); + } + + /** + * Returns a DatanodeDetails protobuf message from a datanode ID. + * @return HddsProtos.DatanodeDetailsProto + */ + public HddsProtos.DatanodeDetailsProto getProtoBufMessage() { + HddsProtos.DatanodeDetailsProto.Builder builder = + HddsProtos.DatanodeDetailsProto.newBuilder() + .setUuid(getUuidString()); + if (ipAddress != null) { + builder.setIpAddress(ipAddress); + } + if (hostName != null) { + builder.setHostName(hostName); + } + if (infoPort != null) { + builder.setInfoPort(infoPort); + } + if (infoSecurePort != null) { + builder.setInfoSecurePort(infoSecurePort); + } + if (containerPort != null) { + builder.setContainerPort(containerPort); + } + if (ratisPort != null) { + builder.setRatisPort(ratisPort); + } + if (ozoneRestPort != null) { + builder.setOzoneRestPort(ozoneRestPort); + } + return builder.build(); + } + + @Override + public String toString() { + return uuid.toString() + "{" + + "ip: " + + ipAddress + + ", host: " + + hostName + + "}"; + } + + @Override + public int compareTo(DatanodeDetails that) { + return this.getUuid().compareTo(that.getUuid()); + } + + /** + * Returns DatanodeDetails.Builder instance. + * + * @return DatanodeDetails.Builder + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder class for building DatanodeDetails. + */ + public static class Builder { + private String id; + private String ipAddress; + private String hostName; + private Integer infoPort; + private Integer infoSecurePort; + private Integer containerPort; + private Integer ratisPort; + private Integer ozoneRestPort; + + /** + * Sets the DatanodeUuid. + * + * @param uuid DatanodeUuid + * @return DatanodeDetails.Builder + */ + public Builder setUuid(String uuid) { + this.id = uuid; + return this; + } + + /** + * Sets the IP address of DataNode. + * + * @param ip address + * @return DatanodeDetails.Builder + */ + public Builder setIpAddress(String ip) { + this.ipAddress = ip; + return this; + } + + /** + * Sets the hostname of DataNode. + * + * @param host hostname + * @return DatanodeDetails.Builder + */ + public Builder setHostName(String host) { + this.hostName = host; + return this; + } + + /** + * Sets the InfoPort. + * + * @param port InfoPort + * @return DatanodeDetails.Builder + */ + public Builder setInfoPort(Integer port) { + this.infoPort = port; + return this; + } + + /** + * Sets the Secure Info Port. + * + * @param port InfoSecurePort + * @return DatanodeDetails.Builder + */ + public Builder setInfoSecurePort(Integer port) { + this.infoSecurePort = port; + return this; + } + + /** + * Sets the ContainerPort. + * + * @param port ContainerPort + * @return DatanodeDetails.Builder + */ + public Builder setContainerPort(Integer port) { + this.containerPort = port; + return this; + } + + /** + * Sets the RatisPort. + * + * @param port RatisPort + * @return DatanodeDetails.Builder + */ + public Builder setRatisPort(Integer port) { + this.ratisPort = port; + return this; + } + + /** + * Sets the OzoneRestPort. + * + * @param port OzoneRestPort + * @return DatanodeDetails.Builder + */ + public Builder setOzoneRestPort(Integer port) { + this.ozoneRestPort = port; + return this; + } + + /** + * Builds and returns DatanodeDetails instance. + * + * @return DatanodeDetails + */ + public DatanodeDetails build() { + Preconditions.checkNotNull(id); + return new DatanodeDetails(id, ipAddress, hostName, + infoPort, infoSecurePort, containerPort, ratisPort, ozoneRestPort); + } + + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/package-info.java new file mode 100644 index 0000000..7dae0fc --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains HDDS protocol related classes. + */ +package org.apache.hadoop.hdds.protocol; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java new file mode 100644 index 0000000..7f40ab2 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * This class contains constants for configuration keys used in SCM. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public final class ScmConfigKeys { + + public static final String SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY = + "scm.container.client.idle.threshold"; + public static final String SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT = + "10s"; + + public static final String SCM_CONTAINER_CLIENT_MAX_SIZE_KEY = + "scm.container.client.max.size"; + public static final int SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT = + 256; + + public static final String SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS = + "scm.container.client.max.outstanding.requests"; + public static final int SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS_DEFAULT + = 100; + + public static final String DFS_CONTAINER_RATIS_ENABLED_KEY + = "dfs.container.ratis.enabled"; + public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT + = false; + public static final String DFS_CONTAINER_RATIS_RPC_TYPE_KEY + = "dfs.container.ratis.rpc.type"; + public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT + = "GRPC"; + public static final String DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY + = "dfs.container.ratis.num.write.chunk.threads"; + public static final int DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT + = 60; + public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY = + "dfs.container.ratis.segment.size"; + public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT = + 1 * 1024 * 1024 * 1024; + public static final String DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY = + "dfs.container.ratis.segment.preallocated.size"; + public static final int + DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT = 128 * 1024 * 1024; + + // TODO : this is copied from OzoneConsts, may need to move to a better place + public static final String OZONE_SCM_CHUNK_SIZE_KEY = "ozone.scm.chunk.size"; + // 16 MB by default + public static final int OZONE_SCM_CHUNK_SIZE_DEFAULT = 16 * 1024 * 1024; + public static final int OZONE_SCM_CHUNK_MAX_SIZE = 32 * 1024 * 1024; + + public static final String OZONE_SCM_CLIENT_PORT_KEY = + "ozone.scm.client.port"; + public static final int OZONE_SCM_CLIENT_PORT_DEFAULT = 9860; + + public static final String OZONE_SCM_DATANODE_PORT_KEY = + "ozone.scm.datanode.port"; + public static final int OZONE_SCM_DATANODE_PORT_DEFAULT = 9861; + + // OZONE_KSM_PORT_DEFAULT = 9862 + public static final String OZONE_SCM_BLOCK_CLIENT_PORT_KEY = + "ozone.scm.block.client.port"; + public static final int OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT = 9863; + + // Container service client + public static final String OZONE_SCM_CLIENT_ADDRESS_KEY = + "ozone.scm.client.address"; + public static final String OZONE_SCM_CLIENT_BIND_HOST_KEY = + "ozone.scm.client.bind.host"; + public static final String OZONE_SCM_CLIENT_BIND_HOST_DEFAULT = + "0.0.0.0"; + + // Block service client + public static final String OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY = + "ozone.scm.block.client.address"; + public static final String OZONE_SCM_BLOCK_CLIENT_BIND_HOST_KEY = + "ozone.scm.block.client.bind.host"; + public static final String OZONE_SCM_BLOCK_CLIENT_BIND_HOST_DEFAULT = + "0.0.0.0"; + + public static final String OZONE_SCM_DATANODE_ADDRESS_KEY = + "ozone.scm.datanode.address"; + public static final String OZONE_SCM_DATANODE_BIND_HOST_KEY = + "ozone.scm.datanode.bind.host"; + public static final String OZONE_SCM_DATANODE_BIND_HOST_DEFAULT = + "0.0.0.0"; + + public static final String OZONE_SCM_HTTP_ENABLED_KEY = + "ozone.scm.http.enabled"; + public static final String OZONE_SCM_HTTP_BIND_HOST_KEY = + "ozone.scm.http-bind-host"; + public static final String OZONE_SCM_HTTPS_BIND_HOST_KEY = + "ozone.scm.https-bind-host"; + public static final String OZONE_SCM_HTTP_ADDRESS_KEY = + "ozone.scm.http-address"; + public static final String OZONE_SCM_HTTPS_ADDRESS_KEY = + "ozone.scm.https-address"; + public static final String OZONE_SCM_KEYTAB_FILE = + "ozone.scm.keytab.file"; + public static final String OZONE_SCM_HTTP_BIND_HOST_DEFAULT = "0.0.0.0"; + public static final int OZONE_SCM_HTTP_BIND_PORT_DEFAULT = 9876; + public static final int OZONE_SCM_HTTPS_BIND_PORT_DEFAULT = 9877; + + public static final String HDDS_REST_HTTP_ADDRESS_KEY = + "hdds.rest.http-address"; + public static final String HDDS_REST_HTTP_ADDRESS_DEFAULT = "0.0.0.0:9880"; + public static final String HDDS_REST_CSRF_ENABLED_KEY = + "hdds.rest.rest-csrf.enabled"; + public static final boolean HDDS_REST_CSRF_ENABLED_DEFAULT = false; + public static final String HDDS_REST_NETTY_HIGH_WATERMARK = + "hdds.rest.netty.high.watermark"; + public static final int HDDS_REST_NETTY_HIGH_WATERMARK_DEFAULT = 65536; + public static final int HDDS_REST_NETTY_LOW_WATERMARK_DEFAULT = 32768; + public static final String HDDS_REST_NETTY_LOW_WATERMARK = + "hdds.rest.netty.low.watermark"; + + public static final String OZONE_SCM_HANDLER_COUNT_KEY = + "ozone.scm.handler.count.key"; + public static final int OZONE_SCM_HANDLER_COUNT_DEFAULT = 10; + + public static final String OZONE_SCM_HEARTBEAT_INTERVAL = + "ozone.scm.heartbeat.interval"; + public static final String OZONE_SCM_HEARBEAT_INTERVAL_DEFAULT = + "30s"; + + public static final String OZONE_SCM_DEADNODE_INTERVAL = + "ozone.scm.dead.node.interval"; + public static final String OZONE_SCM_DEADNODE_INTERVAL_DEFAULT = + "10m"; + + public static final String OZONE_SCM_MAX_HB_COUNT_TO_PROCESS = + "ozone.scm.max.hb.count.to.process"; + public static final int OZONE_SCM_MAX_HB_COUNT_TO_PROCESS_DEFAULT = 5000; + + public static final String OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL = + "ozone.scm.heartbeat.thread.interval"; + public static final String OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_DEFAULT = + "3s"; + + public static final String OZONE_SCM_STALENODE_INTERVAL = + "ozone.scm.stale.node.interval"; + public static final String OZONE_SCM_STALENODE_INTERVAL_DEFAULT = + "90s"; + + public static final String OZONE_SCM_HEARTBEAT_RPC_TIMEOUT = + "ozone.scm.heartbeat.rpc-timeout"; + public static final long OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT = + 1000; + + /** + * Defines how frequently we will log the missing of heartbeat to a specific + * SCM. In the default case we will write a warning message for each 10 + * sequential heart beats that we miss to a specific SCM. This is to avoid + * overrunning the log with lots of HB missed Log statements. + */ + public static final String OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT = + "ozone.scm.heartbeat.log.warn.interval.count"; + public static final int OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT = + 10; + + // ozone.scm.names key is a set of DNS | DNS:PORT | IP Address | IP:PORT. + // Written as a comma separated string. e.g. scm1, scm2:8020, 7.7.7.7:7777 + // + // If this key is not specified datanodes will not be able to find + // SCM. The SCM membership can be dynamic, so this key should contain + // all possible SCM names. Once the SCM leader is discovered datanodes will + // get the right list of SCMs to heartbeat to from the leader. + // While it is good for the datanodes to know the names of all SCM nodes, + // it is sufficient to actually know the name of on working SCM. That SCM + // will be able to return the information about other SCMs that are part of + // the SCM replicated Log. + // + //In case of a membership change, any one of the SCM machines will be + // able to send back a new list to the datanodes. + public static final String OZONE_SCM_NAMES = "ozone.scm.names"; + + public static final int OZONE_SCM_DEFAULT_PORT = + OZONE_SCM_DATANODE_PORT_DEFAULT; + // File Name and path where datanode ID is to written to. + // if this value is not set then container startup will fail. + public static final String OZONE_SCM_DATANODE_ID = "ozone.scm.datanode.id"; + + public static final String OZONE_SCM_DATANODE_ID_PATH_DEFAULT = "datanode.id"; + + public static final String OZONE_SCM_DB_CACHE_SIZE_MB = + "ozone.scm.db.cache.size.mb"; + public static final int OZONE_SCM_DB_CACHE_SIZE_DEFAULT = 128; + + public static final String OZONE_SCM_CONTAINER_SIZE_GB = + "ozone.scm.container.size.gb"; + public static final int OZONE_SCM_CONTAINER_SIZE_DEFAULT = 5; + + public static final String OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY = + "ozone.scm.container.placement.impl"; + + public static final String OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE = + "ozone.scm.container.provision_batch_size"; + public static final int OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT = 20; + + public static final String OZONE_SCM_CONTAINER_DELETION_CHOOSING_POLICY = + "ozone.scm.container.deletion-choosing.policy"; + + public static final String OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT = + "ozone.scm.container.creation.lease.timeout"; + + public static final String + OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT = "60s"; + + /** + * Don't start processing a pool if we have not had a minimum number of + * seconds from the last processing. + */ + public static final String OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL = + "ozone.scm.container.report.processing.interval"; + public static final String + OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT = "60s"; + + /** + * This determines the total number of pools to be processed in parallel. + */ + public static final String OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS = + "ozone.scm.max.nodepool.processing.threads"; + public static final int OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT = 1; + /** + * These 2 settings control the number of threads in executor pool and time + * outs for thw container reports from all nodes. + */ + public static final String OZONE_SCM_MAX_CONTAINER_REPORT_THREADS = + "ozone.scm.max.container.report.threads"; + public static final int OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT = 100; + public static final String OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT = + "ozone.scm.container.reports.wait.timeout"; + public static final String OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT = + "5m"; + + public static final String OZONE_SCM_BLOCK_DELETION_MAX_RETRY = + "ozone.scm.block.deletion.max.retry"; + public static final int OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT = 4096; + + // Once a container usage crosses this threshold, it is eligible for + // closing. + public static final String OZONE_SCM_CONTAINER_CLOSE_THRESHOLD = + "ozone.scm.container.close.threshold"; + public static final float OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f; + /** + * Never constructed. + */ + private ScmConfigKeys() { + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmInfo.java new file mode 100644 index 0000000..6236feb --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmInfo.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm; + +/** + * ScmInfo wraps the result returned from SCM#getScmInfo which + * contains clusterId and the SCM Id. + */ +public final class ScmInfo { + private String clusterId; + private String scmId; + + /** + * Builder for ScmInfo. + */ + public static class Builder { + private String clusterId; + private String scmId; + + /** + * sets the cluster id. + * @param cid clusterId to be set + * @return Builder for ScmInfo + */ + public Builder setClusterId(String cid) { + this.clusterId = cid; + return this; + } + + /** + * sets the scmId. + * @param id scmId + * @return Builder for scmInfo + */ + public Builder setScmId(String id) { + this.scmId = id; + return this; + } + + public ScmInfo build() { + return new ScmInfo(clusterId, scmId); + } + } + + private ScmInfo(String clusterId, String scmId) { + this.clusterId = clusterId; + this.scmId = scmId; + } + + /** + * Gets the clusterId from the Version file. + * @return ClusterId + */ + public String getClusterId() { + return clusterId; + } + + /** + * Gets the SCM Id from the Version file. + * @return SCM Id + */ + public String getScmId() { + return scmId; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java new file mode 100644 index 0000000..c96f79b --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos + .ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos + .ContainerCommandResponseProto; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A Client for the storageContainer protocol. + */ +public abstract class XceiverClientSpi implements Closeable { + + final private AtomicInteger referenceCount; + private boolean isEvicted; + + XceiverClientSpi() { + this.referenceCount = new AtomicInteger(0); + this.isEvicted = false; + } + + void incrementReference() { + this.referenceCount.incrementAndGet(); + } + + void decrementReference() { + this.referenceCount.decrementAndGet(); + cleanup(); + } + + void setEvicted() { + isEvicted = true; + cleanup(); + } + + // close the xceiverClient only if, + // 1) there is no refcount on the client + // 2) it has been evicted from the cache. + private void cleanup() { + if (referenceCount.get() == 0 && isEvicted) { + close(); + } + } + + @VisibleForTesting + public int getRefcount() { + return referenceCount.get(); + } + + /** + * Connects to the leader in the pipeline. + */ + public abstract void connect() throws Exception; + + @Override + public abstract void close(); + + /** + * Returns the pipeline of machines that host the container used by this + * client. + * + * @return pipeline of machines that host the container + */ + public abstract Pipeline getPipeline(); + + /** + * Sends a given command to server and gets the reply back. + * @param request Request + * @return Response to the command + * @throws IOException + */ + public abstract ContainerCommandResponseProto sendCommand( + ContainerCommandRequestProto request) throws IOException; + + /** + * Sends a given command to server gets a waitable future back. + * + * @param request Request + * @return Response to the command + * @throws IOException + */ + public abstract CompletableFuture<ContainerCommandResponseProto> + sendCommandAsync(ContainerCommandRequestProto request) + throws IOException, ExecutionException, InterruptedException; + + /** + * Create a pipeline. + * + * @param pipelineID - Name of the pipeline. + * @param datanodes - Datanodes + */ + public abstract void createPipeline(String pipelineID, + List<DatanodeDetails> datanodes) throws IOException; + + /** + * Returns pipeline Type. + * + * @return - {Stand_Alone, Ratis or Chained} + */ + public abstract HddsProtos.ReplicationType getPipelineType(); +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org