http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataStore.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataStore.java new file mode 100644 index 0000000..b90b08f --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataStore.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.utils; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Interface for key-value store that stores ozone metadata. + * Ozone metadata is stored as key value pairs, both key and value + * are arbitrary byte arrays. + */ +@InterfaceStability.Evolving +public interface MetadataStore extends Closeable{ + + /** + * Puts a key-value pair into the store. + * + * @param key metadata key + * @param value metadata value + */ + void put(byte[] key, byte[] value) throws IOException; + + /** + * @return true if the metadata store is empty. + * + * @throws IOException + */ + boolean isEmpty() throws IOException; + + /** + * Returns the value mapped to the given key in byte array. + * + * @param key metadata key + * @return value in byte array + * @throws IOException + */ + byte[] get(byte[] key) throws IOException; + + /** + * Deletes a key from the metadata store. + * + * @param key metadata key + * @throws IOException + */ + void delete(byte[] key) throws IOException; + + /** + * Returns a certain range of key value pairs as a list based on a + * startKey or count. Further a {@link MetadataKeyFilter} can be added to + * filter keys if necessary. To prevent race conditions while listing + * entries, this implementation takes a snapshot and lists the entries from + * the snapshot. This may, on the other hand, cause the range result slight + * different with actual data if data is updating concurrently. + * <p> + * If the startKey is specified and found in levelDB, this key and the keys + * after this key will be included in the result. If the startKey is null + * all entries will be included as long as other conditions are satisfied. + * If the given startKey doesn't exist and empty list will be returned. + * <p> + * The count argument is to limit number of total entries to return, + * the value for count must be an integer greater than 0. + * <p> + * This method allows to specify one or more {@link MetadataKeyFilter} + * to filter keys by certain condition. Once given, only the entries + * whose key passes all the filters will be included in the result. + * + * @param startKey a start key. + * @param count max number of entries to return. + * @param filters customized one or more {@link MetadataKeyFilter}. + * @return a list of entries found in the database or an empty list if the + * startKey is invalid. + * @throws IOException if there are I/O errors. + * @throws IllegalArgumentException if count is less than 0. + */ + List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey, + int count, MetadataKeyFilter... filters) + throws IOException, IllegalArgumentException; + + /** + * This method is very similar to {@link #getRangeKVs}, the only + * different is this method is supposed to return a sequential range + * of elements based on the filters. While iterating the elements, + * if it met any entry that cannot pass the filter, the iterator will stop + * from this point without looking for next match. If no filter is given, + * this method behaves just like {@link #getRangeKVs}. + * + * @param startKey a start key. + * @param count max number of entries to return. + * @param filters customized one or more {@link MetadataKeyFilter}. + * @return a list of entries found in the database. + * @throws IOException + * @throws IllegalArgumentException + */ + List<Map.Entry<byte[], byte[]>> getSequentialRangeKVs(byte[] startKey, + int count, MetadataKeyFilter... filters) + throws IOException, IllegalArgumentException; + + /** + * A batch of PUT, DELETE operations handled as a single atomic write. + * + * @throws IOException write fails + */ + void writeBatch(BatchOperation operation) throws IOException; + + /** + * Compact the entire database. + * @throws IOException + */ + void compactDB() throws IOException; + + /** + * Destroy the content of the specified database, + * a destroyed database will not be able to load again. + * Be very careful with this method. + * + * @throws IOException if I/O error happens + */ + void destroy() throws IOException; + + /** + * Seek the database to a certain key, returns the key-value + * pairs around this key based on the given offset. Note, this method + * can only support offset -1 (left), 0 (current) and 1 (right), + * any other offset given will cause a {@link IllegalArgumentException}. + * + * @param offset offset to the key + * @param from from which key + * @return a key-value pair + * @throws IOException + */ + ImmutablePair<byte[], byte[]> peekAround(int offset, byte[] from) + throws IOException, IllegalArgumentException; + + /** + * Iterates entries in the database from a certain key. + * Applies the given {@link EntryConsumer} to the key and value of + * each entry, the function produces a boolean result which is used + * as the criteria to exit from iteration. + * + * @param from the start key + * @param consumer + * a {@link EntryConsumer} applied to each key and value. If the consumer + * returns true, continues the iteration to next entry; otherwise exits + * the iteration. + * @throws IOException + */ + void iterate(byte[] from, EntryConsumer consumer) + throws IOException; +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java new file mode 100644 index 0000000..9e9c32a --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.iq80.leveldb.Options; +import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.Statistics; +import org.rocksdb.StatsLevel; + +import java.io.File; +import java.io.IOException; + +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_METADATA_STORE_IMPL_LEVELDB; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_METADATA_STORE_IMPL_ROCKSDB; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_METADATA_STORE_ROCKSDB_STATISTICS; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_METADATA_STORE_ROCKSDB_STATISTICS_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_METADATA_STORE_ROCKSDB_STATISTICS_OFF; + +/** + * Builder for metadata store. + */ +public class MetadataStoreBuilder { + + private File dbFile; + private long cacheSize; + private boolean createIfMissing = true; + private Configuration conf; + + public static MetadataStoreBuilder newBuilder() { + return new MetadataStoreBuilder(); + } + + public MetadataStoreBuilder setDbFile(File dbPath) { + this.dbFile = dbPath; + return this; + } + + public MetadataStoreBuilder setCacheSize(long cache) { + this.cacheSize = cache; + return this; + } + + public MetadataStoreBuilder setCreateIfMissing(boolean doCreate) { + this.createIfMissing = doCreate; + return this; + } + + public MetadataStoreBuilder setConf(Configuration configuration) { + this.conf = configuration; + return this; + } + + public MetadataStore build() throws IOException { + if (dbFile == null) { + throw new IllegalArgumentException("Failed to build metadata store, " + + "dbFile is required but not found"); + } + + // Build db store based on configuration + MetadataStore store = null; + String impl = conf == null ? + OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_DEFAULT : + conf.getTrimmed(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL, + OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_DEFAULT); + if (OZONE_METADATA_STORE_IMPL_LEVELDB.equals(impl)) { + Options options = new Options(); + options.createIfMissing(createIfMissing); + if (cacheSize > 0) { + options.cacheSize(cacheSize); + } + store = new LevelDBStore(dbFile, options); + } else if (OZONE_METADATA_STORE_IMPL_ROCKSDB.equals(impl)) { + org.rocksdb.Options opts = new org.rocksdb.Options(); + opts.setCreateIfMissing(createIfMissing); + + if (cacheSize > 0) { + BlockBasedTableConfig tableConfig = new BlockBasedTableConfig(); + tableConfig.setBlockCacheSize(cacheSize); + opts.setTableFormatConfig(tableConfig); + } + + String rocksDbStat = conf == null ? + OZONE_METADATA_STORE_ROCKSDB_STATISTICS_DEFAULT : + conf.getTrimmed(OZONE_METADATA_STORE_ROCKSDB_STATISTICS, + OZONE_METADATA_STORE_ROCKSDB_STATISTICS_DEFAULT); + + if (!rocksDbStat.equals(OZONE_METADATA_STORE_ROCKSDB_STATISTICS_OFF)) { + Statistics statistics = new Statistics(); + statistics.setStatsLevel(StatsLevel.valueOf(rocksDbStat)); + opts = opts.setStatistics(statistics); + + } + store = new RocksDBStore(dbFile, opts); + } else { + throw new IllegalArgumentException("Invalid argument for " + + OzoneConfigKeys.OZONE_METADATA_STORE_IMPL + + ". Expecting " + OZONE_METADATA_STORE_IMPL_LEVELDB + + " or " + OZONE_METADATA_STORE_IMPL_ROCKSDB + + ", but met " + impl); + } + return store; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java new file mode 100644 index 0000000..a60e98d --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.utils; + +import com.google.common.base.Preconditions; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting; +import org.rocksdb.DbPath; +import org.rocksdb.Options; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.management.ObjectName; +import java.io.File; +import java.io.IOException; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * RocksDB implementation of ozone metadata store. + */ +public class RocksDBStore implements MetadataStore { + + private static final Logger LOG = + LoggerFactory.getLogger(RocksDBStore.class); + + private RocksDB db = null; + private File dbLocation; + private WriteOptions writeOptions; + private Options dbOptions; + private ObjectName statMBeanName; + + public RocksDBStore(File dbFile, Options options) + throws IOException { + Preconditions.checkNotNull(dbFile, "DB file location cannot be null"); + RocksDB.loadLibrary(); + dbOptions = options; + dbLocation = dbFile; + writeOptions = new WriteOptions(); + try { + + db = RocksDB.open(dbOptions, dbLocation.getAbsolutePath()); + if (dbOptions.statistics() != null) { + + Map<String, String> jmxProperties = new HashMap<String, String>(); + jmxProperties.put("dbName", dbFile.getName()); + statMBeanName = MBeans.register("Ozone", "RocksDbStore", jmxProperties, + new RocksDBStoreMBean(dbOptions.statistics())); + if (statMBeanName == null) { + LOG.warn("jmx registration failed during RocksDB init, db path :{}", + dbFile.getAbsolutePath()); + } + } + } catch (RocksDBException e) { + throw new IOException( + "Failed init RocksDB, db path : " + dbFile.getAbsolutePath(), e); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("RocksDB successfully opened."); + LOG.debug("[Option] dbLocation= {}", dbLocation.getAbsolutePath()); + LOG.debug("[Option] createIfMissing = {}", options.createIfMissing()); + LOG.debug("[Option] compactionPriority= {}", options.compactionStyle()); + LOG.debug("[Option] compressionType= {}", options.compressionType()); + LOG.debug("[Option] maxOpenFiles= {}", options.maxOpenFiles()); + LOG.debug("[Option] writeBufferSize= {}", options.writeBufferSize()); + } + } + + private IOException toIOException(String msg, RocksDBException e) { + String statusCode = e.getStatus() == null ? "N/A" : + e.getStatus().getCodeString(); + String errMessage = e.getMessage() == null ? "Unknown error" : + e.getMessage(); + String output = msg + "; status : " + statusCode + + "; message : " + errMessage; + return new IOException(output, e); + } + + @Override + public void put(byte[] key, byte[] value) throws IOException { + try { + db.put(writeOptions, key, value); + } catch (RocksDBException e) { + throw toIOException("Failed to put key-value to metadata store", e); + } + } + + @Override + public boolean isEmpty() throws IOException { + RocksIterator it = null; + try { + it = db.newIterator(); + it.seekToFirst(); + return !it.isValid(); + } finally { + if (it != null) { + it.close(); + } + } + } + + @Override + public byte[] get(byte[] key) throws IOException { + try { + return db.get(key); + } catch (RocksDBException e) { + throw toIOException("Failed to get the value for the given key", e); + } + } + + @Override + public void delete(byte[] key) throws IOException { + try { + db.delete(key); + } catch (RocksDBException e) { + throw toIOException("Failed to delete the given key", e); + } + } + + @Override + public List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey, + int count, MetadataKeyFilters.MetadataKeyFilter... filters) + throws IOException, IllegalArgumentException { + return getRangeKVs(startKey, count, false, filters); + } + + @Override + public List<Map.Entry<byte[], byte[]>> getSequentialRangeKVs(byte[] startKey, + int count, MetadataKeyFilters.MetadataKeyFilter... filters) + throws IOException, IllegalArgumentException { + return getRangeKVs(startKey, count, true, filters); + } + + private List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey, + int count, boolean sequential, + MetadataKeyFilters.MetadataKeyFilter... filters) + throws IOException, IllegalArgumentException { + List<Map.Entry<byte[], byte[]>> result = new ArrayList<>(); + long start = System.currentTimeMillis(); + if (count < 0) { + throw new IllegalArgumentException( + "Invalid count given " + count + ", count must be greater than 0"); + } + RocksIterator it = null; + try { + it = db.newIterator(); + if (startKey == null) { + it.seekToFirst(); + } else { + if(get(startKey) == null) { + // Key not found, return empty list + return result; + } + it.seek(startKey); + } + while(it.isValid() && result.size() < count) { + byte[] currentKey = it.key(); + byte[] currentValue = it.value(); + + it.prev(); + final byte[] prevKey = it.isValid() ? it.key() : null; + + it.seek(currentKey); + it.next(); + final byte[] nextKey = it.isValid() ? it.key() : null; + + if (filters == null) { + result.add(new AbstractMap.SimpleImmutableEntry<>(currentKey, + currentValue)); + } else { + if (Arrays.asList(filters).stream() + .allMatch(entry -> entry.filterKey(prevKey, + currentKey, nextKey))) { + result.add(new AbstractMap.SimpleImmutableEntry<>(currentKey, + currentValue)); + } else { + if (result.size() > 0 && sequential) { + // if the caller asks for a sequential range of results, + // and we met a dis-match, abort iteration from here. + // if result is empty, we continue to look for the first match. + break; + } + } + } + } + } finally { + if (it != null) { + it.close(); + } + long end = System.currentTimeMillis(); + long timeConsumed = end - start; + if (LOG.isDebugEnabled()) { + if (filters != null) { + for (MetadataKeyFilters.MetadataKeyFilter filter : filters) { + int scanned = filter.getKeysScannedNum(); + int hinted = filter.getKeysHintedNum(); + if (scanned > 0 || hinted > 0) { + LOG.debug( + "getRangeKVs ({}) numOfKeysScanned={}, numOfKeysHinted={}", + filter.getClass().getSimpleName(), filter.getKeysScannedNum(), + filter.getKeysHintedNum()); + } + } + } + LOG.debug("Time consumed for getRangeKVs() is {}ms," + + " result length is {}.", timeConsumed, result.size()); + } + } + return result; + } + + @Override + public void writeBatch(BatchOperation operation) + throws IOException { + List<BatchOperation.SingleOperation> operations = + operation.getOperations(); + if (!operations.isEmpty()) { + try (WriteBatch writeBatch = new WriteBatch()) { + for (BatchOperation.SingleOperation opt : operations) { + switch (opt.getOpt()) { + case DELETE: + writeBatch.remove(opt.getKey()); + break; + case PUT: + writeBatch.put(opt.getKey(), opt.getValue()); + break; + default: + throw new IllegalArgumentException("Invalid operation " + + opt.getOpt()); + } + } + db.write(writeOptions, writeBatch); + } catch (RocksDBException e) { + throw toIOException("Batch write operation failed", e); + } + } + } + + @Override + public void compactDB() throws IOException { + if (db != null) { + try { + db.compactRange(); + } catch (RocksDBException e) { + throw toIOException("Failed to compact db", e); + } + } + } + + private void deleteQuietly(File fileOrDir) { + if (fileOrDir != null && fileOrDir.exists()) { + try { + FileUtils.forceDelete(fileOrDir); + } catch (IOException e) { + LOG.warn("Failed to delete dir {}", fileOrDir.getAbsolutePath(), e); + } + } + } + + @Override + public void destroy() throws IOException { + // Make sure db is closed. + close(); + + // There is no destroydb java API available, + // equivalently we can delete all db directories. + deleteQuietly(dbLocation); + deleteQuietly(new File(dbOptions.dbLogDir())); + deleteQuietly(new File(dbOptions.walDir())); + List<DbPath> dbPaths = dbOptions.dbPaths(); + if (dbPaths != null) { + dbPaths.forEach(dbPath -> { + deleteQuietly(new File(dbPath.toString())); + }); + } + } + + @Override + public ImmutablePair<byte[], byte[]> peekAround(int offset, + byte[] from) throws IOException, IllegalArgumentException { + RocksIterator it = null; + try { + it = db.newIterator(); + if (from == null) { + it.seekToFirst(); + } else { + it.seek(from); + } + if (!it.isValid()) { + return null; + } + + switch (offset) { + case 0: + break; + case 1: + it.next(); + break; + case -1: + it.prev(); + break; + default: + throw new IllegalArgumentException( + "Position can only be -1, 0 " + "or 1, but found " + offset); + } + return it.isValid() ? new ImmutablePair<>(it.key(), it.value()) : null; + } finally { + if (it != null) { + it.close(); + } + } + } + + @Override + public void iterate(byte[] from, EntryConsumer consumer) + throws IOException { + RocksIterator it = null; + try { + it = db.newIterator(); + if (from != null) { + it.seek(from); + } else { + it.seekToFirst(); + } + while (it.isValid()) { + if (!consumer.consume(it.key(), it.value())) { + break; + } + it.next(); + } + } finally { + if (it != null) { + it.close(); + } + } + } + + @Override + public void close() throws IOException { + if (statMBeanName != null) { + MBeans.unregister(statMBeanName); + } + if (db != null) { + db.close(); + } + + } + + @VisibleForTesting + protected ObjectName getStatMBeanName() { + return statMBeanName; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStoreMBean.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStoreMBean.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStoreMBean.java new file mode 100644 index 0000000..88c093e --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStoreMBean.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.utils; + +import org.rocksdb.HistogramData; +import org.rocksdb.HistogramType; +import org.rocksdb.Statistics; +import org.rocksdb.TickerType; + +import javax.management.Attribute; +import javax.management.AttributeList; +import javax.management.AttributeNotFoundException; +import javax.management.DynamicMBean; +import javax.management.InvalidAttributeValueException; +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanException; +import javax.management.MBeanInfo; +import javax.management.ReflectionException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Adapter JMX bean to publish all the Rocksdb metrics. + */ +public class RocksDBStoreMBean implements DynamicMBean { + + private Statistics statistics; + + private Set<String> histogramAttributes = new HashSet<>(); + + public RocksDBStoreMBean(Statistics statistics) { + this.statistics = statistics; + histogramAttributes.add("Average"); + histogramAttributes.add("Median"); + histogramAttributes.add("Percentile95"); + histogramAttributes.add("Percentile99"); + histogramAttributes.add("StandardDeviation"); + } + + @Override + public Object getAttribute(String attribute) + throws AttributeNotFoundException, MBeanException, ReflectionException { + for (String histogramAttribute : histogramAttributes) { + if (attribute.endsWith("_" + histogramAttribute.toUpperCase())) { + String keyName = attribute + .substring(0, attribute.length() - histogramAttribute.length() - 1); + try { + HistogramData histogram = + statistics.getHistogramData(HistogramType.valueOf(keyName)); + try { + Method method = + HistogramData.class.getMethod("get" + histogramAttribute); + return method.invoke(histogram); + } catch (Exception e) { + throw new ReflectionException(e, + "Can't read attribute " + attribute); + } + } catch (IllegalArgumentException exception) { + throw new AttributeNotFoundException( + "No such attribute in RocksDB stats: " + attribute); + } + } + } + try { + return statistics.getTickerCount(TickerType.valueOf(attribute)); + } catch (IllegalArgumentException ex) { + throw new AttributeNotFoundException( + "No such attribute in RocksDB stats: " + attribute); + } + } + + @Override + public void setAttribute(Attribute attribute) + throws AttributeNotFoundException, InvalidAttributeValueException, + MBeanException, ReflectionException { + + } + + @Override + public AttributeList getAttributes(String[] attributes) { + AttributeList result = new AttributeList(); + for (String attributeName : attributes) { + try { + Object value = getAttribute(attributeName); + result.add(value); + } catch (Exception e) { + //TODO + } + } + return result; + } + + @Override + public AttributeList setAttributes(AttributeList attributes) { + return null; + } + + @Override + public Object invoke(String actionName, Object[] params, String[] signature) + throws MBeanException, ReflectionException { + return null; + } + + @Override + public MBeanInfo getMBeanInfo() { + + List<MBeanAttributeInfo> attributes = new ArrayList<>(); + for (TickerType tickerType : TickerType.values()) { + attributes.add(new MBeanAttributeInfo(tickerType.name(), "long", + "RocksDBStat: " + tickerType.name(), true, false, false)); + } + for (HistogramType histogramType : HistogramType.values()) { + for (String histogramAttribute : histogramAttributes) { + attributes.add(new MBeanAttributeInfo( + histogramType.name() + "_" + histogramAttribute.toUpperCase(), + "long", "RocksDBStat: " + histogramType.name(), true, false, + false)); + } + } + + return new MBeanInfo("", "RocksDBStat", + attributes.toArray(new MBeanAttributeInfo[0]), null, null, null); + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/package-info.java new file mode 100644 index 0000000..4466337 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.utils; http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java new file mode 100644 index 0000000..3a55831 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ratis; + +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.rpc.RpcType; +import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.util.SizeInBytes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Ratis helper methods. + */ +public interface RatisHelper { + Logger LOG = LoggerFactory.getLogger(RatisHelper.class); + + static String toRaftPeerIdString(DatanodeDetails id) { + return id.getUuidString() + "_" + id.getRatisPort(); + } + + static String toRaftPeerAddressString(DatanodeDetails id) { + return id.getIpAddress() + ":" + id.getRatisPort(); + } + + static RaftPeerId toRaftPeerId(DatanodeDetails id) { + return RaftPeerId.valueOf(toRaftPeerIdString(id)); + } + + static RaftPeer toRaftPeer(DatanodeDetails id) { + return new RaftPeer(toRaftPeerId(id), toRaftPeerAddressString(id)); + } + + static List<RaftPeer> toRaftPeers(Pipeline pipeline) { + return toRaftPeers(pipeline.getMachines()); + } + + static <E extends DatanodeDetails> List<RaftPeer> toRaftPeers( + List<E> datanodes) { + return datanodes.stream().map(RatisHelper::toRaftPeer) + .collect(Collectors.toList()); + } + + /* TODO: use a dummy id for all groups for the moment. + * It should be changed to a unique id for each group. + */ + RaftGroupId DUMMY_GROUP_ID = + RaftGroupId.valueOf(ByteString.copyFromUtf8("AOzoneRatisGroup")); + + RaftGroup EMPTY_GROUP = new RaftGroup(DUMMY_GROUP_ID, + Collections.emptyList()); + + static RaftGroup emptyRaftGroup() { + return EMPTY_GROUP; + } + + static RaftGroup newRaftGroup(List<DatanodeDetails> datanodes) { + final List<RaftPeer> newPeers = datanodes.stream() + .map(RatisHelper::toRaftPeer) + .collect(Collectors.toList()); + return RatisHelper.newRaftGroup(newPeers); + } + + static RaftGroup newRaftGroup(Collection<RaftPeer> peers) { + return peers.isEmpty()? emptyRaftGroup() + : new RaftGroup(DUMMY_GROUP_ID, peers); + } + + static RaftGroup newRaftGroup(Pipeline pipeline) { + return newRaftGroup(toRaftPeers(pipeline)); + } + + static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline) { + return newRaftClient(rpcType, toRaftPeerId(pipeline.getLeader()), + newRaftGroup(pipeline)); + } + + static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader) { + return newRaftClient(rpcType, leader.getId(), + newRaftGroup(new ArrayList<>(Arrays.asList(leader)))); + } + + static RaftClient newRaftClient( + RpcType rpcType, RaftPeerId leader, RaftGroup group) { + LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group); + final RaftProperties properties = new RaftProperties(); + RaftConfigKeys.Rpc.setType(properties, rpcType); + GrpcConfigKeys.setMessageSizeMax(properties, + SizeInBytes.valueOf(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)); + + return RaftClient.newBuilder() + .setRaftGroup(group) + .setLeaderId(leader) + .setProperties(properties) + .build(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/ratis/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/ratis/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/ratis/package-info.java new file mode 100644 index 0000000..c13c20c --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/ratis/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis; + +/** + * This package contains classes related to Apache Ratis. + */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/ratis/shaded/com/google/protobuf/ShadedProtoUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/ratis/shaded/com/google/protobuf/ShadedProtoUtil.java b/hadoop-hdds/common/src/main/java/org/apache/ratis/shaded/com/google/protobuf/ShadedProtoUtil.java new file mode 100644 index 0000000..29242ad --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/ratis/shaded/com/google/protobuf/ShadedProtoUtil.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.shaded.com.google.protobuf; + +/** Utilities for the shaded protobuf in Ratis. */ +public interface ShadedProtoUtil { + /** + * @param bytes + * @return the wrapped shaded {@link ByteString} (no coping). + */ + static ByteString asShadedByteString(byte[] bytes) { + return ByteString.wrap(bytes); + } + + /** + * @param shaded + * @return a {@link com.google.protobuf.ByteString} (require coping). + */ + static com.google.protobuf.ByteString asByteString(ByteString shaded) { + return com.google.protobuf.ByteString.copyFrom( + shaded.asReadOnlyByteBuffer()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/ratis/shaded/com/google/protobuf/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/ratis/shaded/com/google/protobuf/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/ratis/shaded/com/google/protobuf/package-info.java new file mode 100644 index 0000000..032dd96 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/ratis/shaded/com/google/protobuf/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.shaded.com.google.protobuf; + +/** + * This package contains classes related to the shaded protobuf in Apache Ratis. + */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto new file mode 100644 index 0000000..a6270ef --- /dev/null +++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto @@ -0,0 +1,415 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * These .proto interfaces are private and Unstable. + * Please see http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/InterfaceClassification.html + * for what changes are allowed for a *Unstable* .proto interface. + */ + +// This file contains protocol buffers that are used to transfer data +// to and from the datanode. +option java_package = "org.apache.hadoop.hdds.protocol.proto"; +option java_outer_classname = "ContainerProtos"; +option java_generate_equals_and_hash = true; +package hadoop.hdds; +import "hdfs.proto"; +import "hdds.proto"; + +/** + * Commands that are used to manipulate the state of containers on a datanode. + * + * These commands allow us to work against the datanode - from + * StorageContainer Manager as well as clients. + * + * 1. CreateContainer - This call is usually made by Storage Container + * manager, when we need to create a new container on a given datanode. + * + * 2. ReadContainer - Allows end user to stat a container. For example + * this allows us to return the metadata of a container. + * + * 3. UpdateContainer - Updates a container metadata. + + * 4. DeleteContainer - This call is made to delete a container. + * + * 5. ListContainer - Returns the list of containers on this + * datanode. This will be used by tests and tools. + * + * 6. PutKey - Given a valid container, creates a key. + * + * 7. GetKey - Allows user to read the metadata of a Key. + * + * 8. DeleteKey - Deletes a given key. + * + * 9. ListKey - Returns a list of keys that are present inside + * a given container. + * + * 10. ReadChunk - Allows us to read a chunk. + * + * 11. DeleteChunk - Delete an unused chunk. + * + * 12. WriteChunk - Allows us to write a chunk + * + * 13. ListChunk - Given a Container/Key returns the list of Chunks. + * + * 14. CompactChunk - Re-writes a chunk based on Offsets. + * + * 15. PutSmallFile - A single RPC that combines both putKey and WriteChunk. + * + * 16. GetSmallFile - A single RPC that combines both getKey and ReadChunk. + * + * 17. CloseContainer - Closes an open container and makes it immutable. + * + * 18. CopyContainer - Copies a container from a remote machine. + */ + +enum Type { + CreateContainer = 1; + ReadContainer = 2; + UpdateContainer = 3; + DeleteContainer = 4; + ListContainer = 5; + + PutKey = 6; + GetKey = 7; + DeleteKey = 8; + ListKey = 9; + + ReadChunk = 10; + DeleteChunk = 11; + WriteChunk = 12; + ListChunk = 13; + CompactChunk = 14; + + /** Combines Key and Chunk Operation into Single RPC. */ + PutSmallFile = 15; + GetSmallFile = 16; + CloseContainer = 17; + +} + + +enum Result { + SUCCESS = 1; + UNSUPPORTED_REQUEST = 2; + MALFORMED_REQUEST = 3; + CONTAINER_INTERNAL_ERROR = 4; + INVALID_CONFIG = 5; + INVALID_FILE_HASH_FOUND = 6; + CONTAINER_EXISTS = 7; + NO_SUCH_ALGORITHM = 8; + CONTAINER_NOT_FOUND = 9; + IO_EXCEPTION = 10; + UNABLE_TO_READ_METADATA_DB = 11; + NO_SUCH_KEY = 12; + OVERWRITE_FLAG_REQUIRED = 13; + UNABLE_TO_FIND_DATA_DIR = 14; + INVALID_WRITE_SIZE = 15; + CHECKSUM_MISMATCH = 16; + UNABLE_TO_FIND_CHUNK = 17; + PROTOC_DECODING_ERROR = 18; + INVALID_ARGUMENT = 19; + PUT_SMALL_FILE_ERROR = 20; + GET_SMALL_FILE_ERROR = 21; + CLOSED_CONTAINER_IO = 22; + ERROR_CONTAINER_NOT_EMPTY = 23; + ERROR_IN_COMPACT_DB = 24; + UNCLOSED_CONTAINER_IO = 25; + DELETE_ON_OPEN_CONTAINER = 26; + CLOSED_CONTAINER_RETRY = 27; +} + +message ContainerCommandRequestProto { + required Type cmdType = 1; // Type of the command + + // A string that identifies this command, we generate Trace ID in Ozone + // frontend and this allows us to trace that command all over ozone. + optional string traceID = 2; + + // One of the following command is available when the corresponding + // cmdType is set. At the protocol level we allow only + // one command in each packet. + // TODO : Upgrade to Protobuf 2.6 or later. + optional CreateContainerRequestProto createContainer = 3; + optional ReadContainerRequestProto readContainer = 4; + optional UpdateContainerRequestProto updateContainer = 5; + optional DeleteContainerRequestProto deleteContainer = 6; + optional ListContainerRequestProto listContainer = 7; + + optional PutKeyRequestProto putKey = 8; + optional GetKeyRequestProto getKey = 9; + optional DeleteKeyRequestProto deleteKey = 10; + optional ListKeyRequestProto listKey = 11; + + optional ReadChunkRequestProto readChunk = 12; + optional WriteChunkRequestProto writeChunk = 13; + optional DeleteChunkRequestProto deleteChunk = 14; + optional ListChunkRequestProto listChunk = 15; + + optional PutSmallFileRequestProto putSmallFile = 16; + optional GetSmallFileRequestProto getSmallFile = 17; + optional CloseContainerRequestProto closeContainer = 18; + required string datanodeUuid = 19; +} + +message ContainerCommandResponseProto { + required Type cmdType = 1; + optional string traceID = 2; + + optional CreateContainerResponseProto createContainer = 3; + optional ReadContainerResponseProto readContainer = 4; + optional UpdateContainerResponseProto updateContainer = 5; + optional DeleteContainerResponseProto deleteContainer = 6; + optional ListContainerResponseProto listContainer = 7; + + optional PutKeyResponseProto putKey = 8; + optional GetKeyResponseProto getKey = 9; + optional DeleteKeyResponseProto deleteKey = 10; + optional ListKeyResponseProto listKey = 11; + + optional WriteChunkResponseProto writeChunk = 12; + optional ReadChunkResponseProto readChunk = 13; + optional DeleteChunkResponseProto deleteChunk = 14; + optional ListChunkResponseProto listChunk = 15; + + required Result result = 17; + optional string message = 18; + + optional PutSmallFileResponseProto putSmallFile = 19; + optional GetSmallFileResponseProto getSmallFile = 20; + optional CloseContainerResponseProto closeContainer = 21; + +} + +message ContainerData { + required string name = 1; + repeated KeyValue metadata = 2; + optional string dbPath = 3; + optional string containerPath = 4; + optional string hash = 6; + optional int64 bytesUsed = 7; + optional int64 size = 8; + optional int64 keyCount = 9; + //TODO: change required after we switch container ID from string to long + optional int64 containerID = 10; + optional LifeCycleState state = 11 [default = OPEN]; +} + +message ContainerMeta { + required string fileName = 1; + required string hash = 2; +} + +// Container Messages. +message CreateContainerRequestProto { + required Pipeline pipeline = 1; + required ContainerData containerData = 2; +} + +message CreateContainerResponseProto { +} + +message ReadContainerRequestProto { + required Pipeline pipeline = 1; + required string name = 2; +} + +message ReadContainerResponseProto { + optional ContainerData containerData = 2; +} + +message UpdateContainerRequestProto { + required Pipeline pipeline = 1; + required ContainerData containerData = 2; + optional bool forceUpdate = 3 [default = false]; +} + +message UpdateContainerResponseProto { +} + +message DeleteContainerRequestProto { + required Pipeline pipeline = 1; + required string name = 2; + optional bool forceDelete = 3 [default = false]; +} + +message DeleteContainerResponseProto { +} + +message ListContainerRequestProto { + required Pipeline pipeline = 1; + optional string prefix = 2; + required uint32 count = 3; // Max Results to return + optional string prevKey = 4; // if this is not set query from start. +} + +message ListContainerResponseProto { + repeated ContainerData containerData = 1; +} + +message CloseContainerRequestProto { + required Pipeline pipeline = 1; +} + +message CloseContainerResponseProto { + optional Pipeline pipeline = 1; + optional string hash = 2; +} + +message KeyData { + required string containerName = 1; + required string name = 2; + optional int64 flags = 3; // for future use. + repeated KeyValue metadata = 4; + repeated ChunkInfo chunks = 5; +} + +// Key Messages. +message PutKeyRequestProto { + required Pipeline pipeline = 1; + required KeyData keyData = 2; +} + +message PutKeyResponseProto { +} + +message GetKeyRequestProto { + required Pipeline pipeline = 1; + required KeyData keyData = 2; +} + +message GetKeyResponseProto { + required KeyData keyData = 1; +} + + +message DeleteKeyRequestProto { + required Pipeline pipeline = 1; + required string name = 2; +} + +message DeleteKeyResponseProto { +} + +message ListKeyRequestProto { + required Pipeline pipeline = 1; + optional string prefix = 2; // if specified returns keys that match prefix. + required string prevKey = 3; + required uint32 count = 4; + +} + +message ListKeyResponseProto { + repeated KeyData keyData = 1; +} + +// Chunk Operations + +message ChunkInfo { + required string chunkName = 1; + required uint64 offset = 2; + required uint64 len = 3; + optional string checksum = 4; + repeated KeyValue metadata = 5; +} + +enum Stage { + WRITE_DATA = 1; + COMMIT_DATA = 2; + COMBINED = 3; +} + +message WriteChunkRequestProto { + required Pipeline pipeline = 1; + required string keyName = 2; + required ChunkInfo chunkData = 3; + optional bytes data = 4; + optional Stage stage = 5 [default = COMBINED]; +} + +message WriteChunkResponseProto { +} + +message ReadChunkRequestProto { + required Pipeline pipeline = 1; + required string keyName = 2; + required ChunkInfo chunkData = 3; +} + +message ReadChunkResponseProto { + required Pipeline pipeline = 1; + required ChunkInfo chunkData = 2; + required bytes data = 3; +} + +message DeleteChunkRequestProto { + required Pipeline pipeline = 1; + required string keyName = 2; + required ChunkInfo chunkData = 3; +} + +message DeleteChunkResponseProto { +} + +message ListChunkRequestProto { + required Pipeline pipeline = 1; + required string keyName = 2; + required string prevChunkName = 3; + required uint32 count = 4; +} + +message ListChunkResponseProto { + repeated ChunkInfo chunkData = 1; +} + +/** For small file access combines write chunk and putKey into a single +RPC */ + +message PutSmallFileRequestProto { + required PutKeyRequestProto key = 1; + required ChunkInfo chunkInfo = 2; + required bytes data = 3; +} + + +message PutSmallFileResponseProto { + +} + +message GetSmallFileRequestProto { + required GetKeyRequestProto key = 1; +} + +message GetSmallFileResponseProto { + required ReadChunkResponseProto data = 1; +} + +message CopyContainerRequestProto { + required string containerName = 1; + required uint64 readOffset = 2; + optional uint64 len = 3; +} + +message CopyContainerResponseProto { + required string archiveName = 1; + required uint64 readOffset = 2; + required uint64 len = 3; + required bool eof = 4; + repeated bytes data = 5; + optional int64 checksum = 6; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto new file mode 100644 index 0000000..38d2e16 --- /dev/null +++ b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * These .proto interfaces are private and unstable. + * Please see http://wiki.apache.org/hadoop/Compatibility + * for what changes are allowed for a *unstable* .proto interface. + */ + +option java_package = "org.apache.hadoop.hdds.protocol.proto"; +option java_outer_classname = "ScmBlockLocationProtocolProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.hdds; + +import "hdfs.proto"; +import "hdds.proto"; + + +// SCM Block protocol +/** + * keys - batch of block keys to find + */ +message GetScmBlockLocationsRequestProto { + repeated string keys = 1; +} + +/** + * locatedBlocks - for each requested hash, nodes that currently host the + * container for that object key hash + */ +message GetScmBlockLocationsResponseProto { + repeated ScmLocatedBlockProto locatedBlocks = 1; +} + +/** + * Holds the nodes that currently host the blocks for a key. + */ +message ScmLocatedBlockProto { + required string key = 1; + required hadoop.hdds.Pipeline pipeline = 2; +} + +/** +* Request send to SCM asking allocate block of specified size. +*/ +message AllocateScmBlockRequestProto { + required uint64 size = 1; + required ReplicationType type = 2; + required hadoop.hdds.ReplicationFactor factor = 3; + required string owner = 4; + +} + +/** + * A delete key request sent by KSM to SCM, it contains + * multiple number of keys (and their blocks). + */ +message DeleteScmKeyBlocksRequestProto { + repeated KeyBlocks keyBlocks = 1; +} + +/** + * A object key and all its associated blocks. + * We need to encapsulate object key name plus the blocks in this potocol + * because SCM needs to response KSM with the keys it has deleted. + * If the response only contains blocks, it will be very expensive for + * KSM to figure out what keys have been deleted. + */ +message KeyBlocks { + required string key = 1; + repeated string blocks = 2; +} + +/** + * A delete key response from SCM to KSM, it contains multiple child-results. + * Each child-result represents a key deletion result, only if all blocks of + * a key are successfully deleted, this key result is considered as succeed. + */ +message DeleteScmKeyBlocksResponseProto { + repeated DeleteKeyBlocksResultProto results = 1; +} + +/** + * A key deletion result. It contains all the block deletion results. + */ +message DeleteKeyBlocksResultProto { + required string objectKey = 1; + repeated DeleteScmBlockResult blockResults = 2; +} + +message DeleteScmBlockResult { + enum Result { + success = 1; + chillMode = 2; + errorNotFound = 3; + unknownFailure = 4; + } + required Result result = 1; + required string key = 2; +} + +/** + * Reply from SCM indicating that the container. + */ +message AllocateScmBlockResponseProto { + enum Error { + success = 1; + errorNotEnoughSpace = 2; + errorSizeTooBig = 3; + unknownFailure = 4; + } + required Error errorCode = 1; + required string key = 2; + required hadoop.hdds.Pipeline pipeline = 3; + required bool createContainer = 4; + optional string errorMessage = 5; +} + +/** + * Protocol used from KeySpaceManager to StorageContainerManager. + * See request and response messages for details of the RPC calls. + */ +service ScmBlockLocationProtocolService { + + /** + * Find the set of nodes that currently host the block, as + * identified by the key. This method supports batch lookup by + * passing multiple keys. + */ + rpc getScmBlockLocations(GetScmBlockLocationsRequestProto) + returns (GetScmBlockLocationsResponseProto); + + /** + * Creates a block entry in SCM. + */ + rpc allocateScmBlock(AllocateScmBlockRequestProto) + returns (AllocateScmBlockResponseProto); + + /** + * Deletes blocks for a set of object keys from SCM. + */ + rpc deleteScmKeyBlocks(DeleteScmKeyBlocksRequestProto) + returns (DeleteScmKeyBlocksResponseProto); + + /** + * Gets the scmInfo from SCM. + */ + rpc getScmInfo(hadoop.hdds.GetScmInfoRequestProto) + returns (hadoop.hdds.GetScmInfoRespsonseProto); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto new file mode 100644 index 0000000..d7540a3 --- /dev/null +++ b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * These .proto interfaces are private and unstable. + * Please see http://wiki.apache.org/hadoop/Compatibility + * for what changes are allowed for a *unstable* .proto interface. + */ + +option java_package = "org.apache.hadoop.hdds.protocol.proto"; +option java_outer_classname = "StorageContainerLocationProtocolProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.hdds; + +import "hdfs.proto"; +import "hdds.proto"; + +/** +* Request send to SCM asking where the container should be created. +*/ +message ContainerRequestProto { + required string containerName = 1; + // Ozone only support replciation of either 1 or 3. + required ReplicationFactor replicationFactor = 2; + required ReplicationType replicationType = 3; + required string owner = 4; + +} + +/** + * Reply from SCM indicating that the container. + */ +message ContainerResponseProto { + enum Error { + success = 1; + errorContainerAlreadyExists = 2; + errorContainerMissing = 3; + } + required Error errorCode = 1; + required Pipeline pipeline = 2; + optional string errorMessage = 3; +} + +message GetContainerRequestProto { + required string containerName = 1; +} + +message GetContainerResponseProto { + required Pipeline pipeline = 1; +} + +message SCMListContainerRequestProto { + required uint32 count = 1; + optional string startName = 2; + optional string prefixName = 3; +} + +message SCMListContainerResponseProto { + repeated SCMContainerInfo containers = 1; +} + +message SCMDeleteContainerRequestProto { + required string containerName = 1; +} + +message SCMDeleteContainerResponseProto { + // Empty response +} + +message ObjectStageChangeRequestProto { + enum Type { + container = 1; + pipeline = 2; + } + // delete/copy operation may be added later + enum Op { + create = 1; + close = 2; + } + enum Stage { + begin = 1; + complete = 2; + } + required string name = 1; + required Type type = 2; + required Op op= 3; + required Stage stage = 4; +} + +message ObjectStageChangeResponseProto { + // Empty response +} + +/* + NodeQueryRequest sends a request to SCM asking to send a list of nodes that + match the NodeState that we are requesting. +*/ +message NodeQueryRequestProto { + + + // Repeated, So we can specify more than one status type. + // These NodeState types are additive for now, in the sense that + // if you specify HEALTHY and FREE_NODE members -- + // Then you get all healthy node which are not raft members. + // + // if you specify all healthy and dead nodes, you will get nothing + // back. Server is not going to dictate what combinations make sense, + // it is entirely up to the caller. + // TODO: Support operators like OR and NOT. Currently it is always an + // implied AND. + + repeated NodeState query = 1; + required QueryScope scope = 2; + optional string poolName = 3; // if scope is pool, then pool name is needed. +} + +message NodeQueryResponseProto { + required NodePool datanodes = 1; +} + +/** + Request to create a replication pipeline. + */ +message PipelineRequestProto { + required ReplicationType replicationType = 1; + required ReplicationFactor replicationFactor = 2; + + // if datanodes are specified then pipelines are created using those + // datanodes. + optional NodePool nodePool = 3; + optional string pipelineID = 4; +} + +message PipelineResponseProto { + enum Error { + success = 1; + errorPipelineAlreadyExists = 2; + } + required Error errorCode = 1; + optional Pipeline pipeline = 2; + optional string errorMessage = 3; +} + +/** + * Protocol used from an HDFS node to StorageContainerManager. See the request + * and response messages for details of the RPC calls. + */ +service StorageContainerLocationProtocolService { + + /** + * Creates a container entry in SCM. + */ + rpc allocateContainer(ContainerRequestProto) returns (ContainerResponseProto); + + /** + * Returns the pipeline for a given container. + */ + rpc getContainer(GetContainerRequestProto) returns (GetContainerResponseProto); + + rpc listContainer(SCMListContainerRequestProto) returns (SCMListContainerResponseProto); + + /** + * Deletes a container in SCM. + */ + rpc deleteContainer(SCMDeleteContainerRequestProto) returns (SCMDeleteContainerResponseProto); + + /** + * Returns a set of Nodes that meet a criteria. + */ + rpc queryNode(NodeQueryRequestProto) returns (NodeQueryResponseProto); + + /** + * Notify from client when begin or finish container or pipeline operations on datanodes. + */ + rpc notifyObjectStageChange(ObjectStageChangeRequestProto) returns (ObjectStageChangeResponseProto); + + /* + * Apis that Manage Pipelines. + * + * Pipelines are abstractions offered by SCM and Datanode that allows users + * to create a replication pipeline. + * + * These following APIs allow command line programs like SCM CLI to list + * and manage pipelines. + */ + + /** + * Creates a replication pipeline. + */ + rpc allocatePipeline(PipelineRequestProto) + returns (PipelineResponseProto); + + /** + * Returns information about SCM. + */ + rpc getScmInfo(GetScmInfoRequestProto) + returns (GetScmInfoRespsonseProto); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/proto/hdds.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto new file mode 100644 index 0000000..f7b2f72 --- /dev/null +++ b/hadoop-hdds/common/src/main/proto/hdds.proto @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * These .proto interfaces are private and unstable. + * Please see http://wiki.apache.org/hadoop/Compatibility + * for what changes are allowed for a *unstable* .proto interface. + */ + +option java_package = "org.apache.hadoop.hdds.protocol.proto"; +option java_outer_classname = "HddsProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.hdds; + +message DatanodeDetailsProto { + // TODO: make the port as a seperate proto message and use it here + required string uuid = 1; // UUID assigned to the Datanode. + required string ipAddress = 2; // IP address + required string hostName = 3; // hostname + optional uint32 infoPort = 4; // datanode http port + optional uint32 infoSecurePort = 5 [default = 0]; // datanode https port + optional uint32 containerPort = 6 [default = 0]; // Ozone stand_alone protocol + optional uint32 ratisPort = 7 [default = 0]; //Ozone ratis port + optional uint32 ozoneRestPort = 8 [default = 0]; +} + +message PipelineChannel { + required string leaderID = 1; + repeated DatanodeDetailsProto members = 2; + optional LifeCycleState state = 3 [default = OPEN]; + optional ReplicationType type = 4 [default = STAND_ALONE]; + optional ReplicationFactor factor = 5 [default = ONE]; + optional string name = 6; +} + +// A pipeline is composed of PipelineChannel (Ratis/StandAlone) that back a +// container. +message Pipeline { + required string containerName = 1; + required PipelineChannel pipelineChannel = 2; +} + +message KeyValue { + required string key = 1; + optional string value = 2; +} + +/** + * Type of the node. + */ +enum NodeType { + KSM = 1; + SCM = 2; + DATANODE = 3; +} + +// Should we rename NodeState to DatanodeState? +/** + * Enum that represents the Node State. This is used in calls to getNodeList + * and getNodeCount. + */ +enum NodeState { + HEALTHY = 1; + STALE = 2; + DEAD = 3; + DECOMMISSIONING = 4; + DECOMMISSIONED = 5; + RAFT_MEMBER = 6; + FREE_NODE = 7; // Not a member in raft. + INVALID = 8; +} + +enum QueryScope { + CLUSTER = 1; + POOL = 2; +} + +message Node { + required DatanodeDetailsProto nodeID = 1; + repeated NodeState nodeStates = 2; +} + +message NodePool { + repeated Node nodes = 1; +} + +/** + * LifeCycleState for SCM object creation state machine: + * ->Allocated: allocated on SCM but clean has not started creating it yet. + * ->Creating: allocated and assigned to client to create but not ack-ed yet. + * ->Open: allocated on SCM and created on datanodes and ack-ed by a client. + * ->Close: container closed due to space all used or error? + * ->Timeout -> container failed to create on datanodes or ack-ed by client. + * ->Deleting(TBD) -> container will be deleted after timeout + * 1. ALLOCATE-ed containers on SCM can't serve key/block related operation + * until ACK-ed explicitly which changes the state to OPEN. + * 2. Only OPEN/CLOSED containers can serve key/block related operation. + * 3. ALLOCATE-ed containers that are not ACK-ed timely will be TIMEOUT and + * CLEANUP asynchronously. + */ + +enum LifeCycleState { + ALLOCATED = 1; + CREATING = 2; // Used for container allocated/created by different client. + OPEN =3; // Mostly an update to SCM via HB or client call. + CLOSING = 4; + CLOSED = 5; // !!State after this has not been used yet. + DELETING = 6; + DELETED = 7; // object is deleted. +} + +enum LifeCycleEvent { + CREATE = 1; // A request to client to create this object + CREATED = 2; + FINALIZE = 3; + CLOSE = 4; // !!Event after this has not been used yet. + UPDATE = 5; + TIMEOUT = 6; // creation has timed out from SCM's View. + DELETE = 7; + CLEANUP = 8; +} + +message SCMContainerInfo { + // TODO : Remove the container name from pipeline. + required string containerName = 1; + required LifeCycleState state = 2; + required Pipeline pipeline = 3; + // This is not total size of container, but space allocated by SCM for + // clients to write blocks + required uint64 allocatedBytes = 4; + required uint64 usedBytes = 5; + required uint64 numberOfKeys = 6; + optional int64 stateEnterTime = 7; + required string owner = 8; + required int64 containerID = 9; +} + +message GetScmInfoRequestProto { +} + +message GetScmInfoRespsonseProto { + required string clusterId = 1; + required string scmId = 2; +} + + +enum ReplicationType { + RATIS = 1; + STAND_ALONE = 2; + CHAINED = 3; +} + +enum ReplicationFactor { + ONE = 1; + THREE = 3; +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org