WIP Various fixes and performance improvements.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/13f7b647 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/13f7b647 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/13f7b647 Branch: refs/heads/feature-HDFS-8286 Commit: 13f7b64731e05903832eb289ae067f7e24afac41 Parents: 23dbb0f Author: Haohui Mai <whe...@apache.org> Authored: Wed Jun 3 14:14:54 2015 -0700 Committer: Haohui Mai <whe...@apache.org> Committed: Fri Jun 12 13:57:01 2015 -0700 ---------------------------------------------------------------------- .../hdfs/server/namenode/FSDirectory.java | 12 ++- .../server/namenode/LevelDBROTransaction.java | 39 ++++++--- .../server/namenode/LevelDBRWTransaction.java | 9 ++- .../namenode/LevelDBReplayTransaction.java | 9 ++- .../hdfs/server/namenode/MemDBChildrenView.java | 6 +- .../hdfs/server/namenode/LevelDBProfile.java | 83 ++++++++++++++++++++ .../java/org/apache/hadoop/hdfs/hdfsdb/DB.java | 23 +++++- .../org/apache/hadoop/hdfs/hdfsdb/Options.java | 6 ++ .../apache/hadoop/hdfs/hdfsdb/ReadOptions.java | 6 ++ .../org/apache/hadoop/hdfs/hdfsdb/Snapshot.java | 34 ++++++++ .../src/main/native/hdfsdb/db/db_impl.cc | 46 +++++++++++ .../src/main/native/hdfsdb/db/db_impl.h | 4 + .../src/main/native/hdfsdb/db/db_test.cc | 8 ++ .../src/main/native/hdfsdb/db/memtable.cc | 8 +- .../src/main/native/hdfsdb/db/memtable.h | 13 ++- .../src/main/native/hdfsdb/include/leveldb/db.h | 11 +++ .../src/main/native/jni/bindings.cc | 66 +++++++++++++++- 17 files changed, 355 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f7b647/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 0e50d8c..b3c6083 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -348,7 +348,17 @@ public class FSDirectory implements Closeable { this.enableLevelDb = conf.getBoolean("dfs.partialns", false); if (enableLevelDb) { String dbPath = conf.get("dfs.partialns.path"); - Options options = new Options().createIfMissing(true); + int writeBufferSize = conf.getInt("dfs.partialns.writebuffer", + 4096 * 1024); + long blockCacheSize = conf.getLong( + "dfs.partialns.blockcache", 0); + Options options = new Options().createIfMissing(true) + .writeBufferSize(writeBufferSize); + + if (blockCacheSize != 0) { + options.blockCacheSize(blockCacheSize); + } + this.levelDb = org.apache.hadoop.hdfs.hdfsdb.DB.open(options, dbPath); try (RWTransaction tx = newRWTransaction().begin()) { tx.putINode(ROOT_INODE_ID, createRootForFlatNS(ns)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f7b647/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBROTransaction.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBROTransaction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBROTransaction.java index f55ed63..50d8c30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBROTransaction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBROTransaction.java @@ -29,37 +29,43 @@ import static org.apache.hadoop.hdfs.server.namenode.INodeId.INVALID_INODE_ID; class LevelDBROTransaction extends ROTransaction { private final org.apache.hadoop.hdfs.hdfsdb.DB hdfsdb; - private static final ReadOptions OPTIONS = new ReadOptions(); + + private Snapshot snapshot; + private final ReadOptions options = new ReadOptions(); + public static final ReadOptions OPTIONS = new ReadOptions(); + LevelDBROTransaction(FSDirectory fsd, org.apache.hadoop.hdfs.hdfsdb.DB db) { super(fsd); this.hdfsdb = db; } LevelDBROTransaction begin() { - fsd.readLock(); + snapshot = hdfsdb.snapshot(); + options.snapshot(snapshot); return this; } @Override FlatINode getINode(long id) { - return getFlatINode(id, hdfsdb); + return getFlatINode(id, hdfsdb, options); } @Override long getChild(long parentId, ByteBuffer localName) { - return getChild(parentId, localName, hdfsdb); + return getChild(parentId, localName, hdfsdb, options); } @Override DBChildrenView childrenView(long parent) { - return getChildrenView(parent, hdfsdb); + return getChildrenView(parent, hdfsdb, options); } static FlatINode getFlatINode( - long id, org.apache.hadoop.hdfs.hdfsdb.DB hdfsdb) { + long id, DB hdfsdb, ReadOptions options) { byte[] key = inodeKey(id); try { - byte[] bytes = hdfsdb.get(OPTIONS, key); + byte[] bytes = options == OPTIONS ? hdfsdb.get(options, key) : hdfsdb + .snapshotGet(options, key); if (bytes == null) { return null; } @@ -83,11 +89,13 @@ class LevelDBROTransaction extends ROTransaction { }; } - static long getChild(long parentId, ByteBuffer localName, DB hdfsdb) { + static long getChild( + long parentId, ByteBuffer localName, DB hdfsdb, ReadOptions options) { Preconditions.checkArgument(localName.hasRemaining()); byte[] key = inodeChildKey(parentId, localName); try { - byte[] bytes = hdfsdb.get(OPTIONS, key); + byte[] bytes = options == OPTIONS ? hdfsdb.get(options, key) : hdfsdb + .snapshotGet(options, key); if (bytes == null) { return INVALID_INODE_ID; } @@ -109,7 +117,8 @@ class LevelDBROTransaction extends ROTransaction { return key; } - static DBChildrenView getChildrenView(long parent, DB hdfsdb) { + static DBChildrenView getChildrenView( + long parent, DB hdfsdb, ReadOptions options) { byte[] key = new byte[]{'I', (byte) ((parent >> 56) & 0xff), (byte) ((parent >> 48) & 0xff), @@ -121,9 +130,17 @@ class LevelDBROTransaction extends ROTransaction { (byte) (parent & 0xff), 1 }; - Iterator it = hdfsdb.iterator(OPTIONS); + Iterator it = hdfsdb.iterator(options); it.seek(key); return new LevelDBChildrenView(parent, it); } + @Override + public void close() throws IOException { + try { + snapshot.close(); + } catch (Exception e) { + throw new IOException(e); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f7b647/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBRWTransaction.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBRWTransaction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBRWTransaction.java index 3f14cff..4c8a2d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBRWTransaction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBRWTransaction.java @@ -37,17 +37,20 @@ class LevelDBRWTransaction extends RWTransaction { @Override FlatINode getINode(long id) { - return LevelDBROTransaction.getFlatINode(id, hdfsdb); + return LevelDBROTransaction.getFlatINode(id, hdfsdb, + LevelDBROTransaction.OPTIONS); } @Override long getChild(long parentId, ByteBuffer localName) { - return LevelDBROTransaction.getChild(parentId, localName, hdfsdb); + return LevelDBROTransaction.getChild(parentId, localName, hdfsdb, + LevelDBROTransaction.OPTIONS); } @Override DBChildrenView childrenView(long parent) { - return LevelDBROTransaction.getChildrenView(parent, hdfsdb); + return LevelDBROTransaction.getChildrenView(parent, hdfsdb, + LevelDBROTransaction.OPTIONS); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f7b647/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBReplayTransaction.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBReplayTransaction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBReplayTransaction.java index e1b8eff..d486010 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBReplayTransaction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBReplayTransaction.java @@ -38,17 +38,20 @@ public class LevelDBReplayTransaction extends ReplayTransaction { @Override FlatINode getINode(long id) { - return LevelDBROTransaction.getFlatINode(id, hdfsdb); + return LevelDBROTransaction.getFlatINode(id, hdfsdb, + LevelDBROTransaction.OPTIONS); } @Override long getChild(long parentId, ByteBuffer localName) { - return LevelDBROTransaction.getChild(parentId, localName, hdfsdb); + return LevelDBROTransaction.getChild(parentId, localName, hdfsdb, + LevelDBROTransaction.OPTIONS); } @Override DBChildrenView childrenView(long parent) { - return LevelDBROTransaction.getChildrenView(parent, hdfsdb); + return LevelDBROTransaction.getChildrenView(parent, hdfsdb, + LevelDBROTransaction.OPTIONS); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f7b647/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/MemDBChildrenView.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/MemDBChildrenView.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/MemDBChildrenView.java index 3278111..743bf0b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/MemDBChildrenView.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/MemDBChildrenView.java @@ -31,6 +31,10 @@ class MemDBChildrenView extends DBChildrenView { @Override public Iterator<Map.Entry<ByteBuffer, Long>> iterator() { - return childrenMap.tailMap(start).entrySet().iterator(); + if (start == null) { + return childrenMap.entrySet().iterator(); + } else { + return childrenMap.tailMap(start).entrySet().iterator(); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f7b647/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/LevelDBProfile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/LevelDBProfile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/LevelDBProfile.java new file mode 100644 index 0000000..51c4dba --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/LevelDBProfile.java @@ -0,0 +1,83 @@ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.hdfsdb.*; +import org.apache.log4j.Level; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.util.Time.monotonicNow; + +/** + * Created by hmai on 6/3/15. + */ +public class LevelDBProfile { + private static final String DB_PATH = "/Users/hmai/work/test/partialnsdb"; + private static final int TIMES = 300000; + public static void main(String[] args) throws Exception { + MiniDFSCluster cluster = null; + Configuration conf = new HdfsConfiguration(); + conf.setBoolean("dfs.partialns", true); + conf.set("dfs.partialns.path", DB_PATH); + conf.setInt("dfs.partialns.writebuffer", 8388608 * 16); + conf.setLong("dfs.partialns.blockcache", 4294967296L); + ExecutorService executor = Executors.newFixedThreadPool(8, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "Executor"); + } + }); + ((Log4JLogger)FSNamesystem.auditLog).getLogger().setLevel(Level.WARN); + + try { + FileUtils.deleteDirectory(new File(DB_PATH)); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + final DistributedFileSystem fs = cluster.getFileSystem(); + final org.apache.hadoop.hdfs.hdfsdb.DB db = cluster.getNamesystem().getFSDirectory().getLevelDb(); + final Path PATH = new Path("/foo"); + final byte[] p = new byte[20]; + try (OutputStream os = fs.create(PATH)) { + } + cluster.shutdownDataNodes(); + final FSNamesystem fsn = cluster.getNamesystem(); + final Runnable getFileStatus = new Runnable() { + @Override + public void run() { + try { + fsn.getFileInfo("/foo", true); + //fs.getFileStatus(PATH); + } catch (IOException e) { + e.printStackTrace(); + } + } + }; + + long start = monotonicNow(); + for (int i = 0; i < TIMES; ++i) { + executor.submit(getFileStatus); + } + executor.shutdown(); + executor.awaitTermination(1, TimeUnit.HOURS); + long end = monotonicNow(); + System.err.println("Time: " + (end - start) + " ms"); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f7b647/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/DB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/DB.java b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/DB.java index 0355dcc..57ec71e 100644 --- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/DB.java +++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/DB.java @@ -21,7 +21,8 @@ import java.io.IOException; public class DB extends NativeObject { public static DB open(Options options, String path) throws IOException { - return new DB(open(options.nativeHandle(), path)); + long handle = open(options.nativeHandle(), path); + return new DB(handle); } @Override @@ -36,6 +37,11 @@ public class DB extends NativeObject { return get(nativeHandle, options.nativeHandle(), key); } + public byte[] snapshotGet(ReadOptions options, byte[] key) throws + IOException { + return snapshotGet(nativeHandle, options.nativeHandle(), key); + } + public void write(WriteOptions options, WriteBatch batch) throws IOException { write(nativeHandle, options.nativeHandle(), batch.nativeHandle()); } @@ -52,6 +58,14 @@ public class DB extends NativeObject { return new Iterator(newIterator(nativeHandle, options.nativeHandle())); } + public Snapshot snapshot() { + return new Snapshot(nativeHandle, newSnapshot(nativeHandle)); + } + + public byte[] dbGetTest(byte[] key) throws IOException { + return getTest(nativeHandle, key); + } + private DB(long handle) { super(handle); } @@ -60,6 +74,8 @@ public class DB extends NativeObject { private static native void close(long handle); private static native byte[] get(long handle, long options, byte[] key) throws IOException; + private static native byte[] snapshotGet(long handle, long options, + byte[] key) throws IOException; private static native void write(long handle, long options, long batch) throws IOException; private static native void put(long handle, long options, @@ -67,4 +83,9 @@ public class DB extends NativeObject { private static native void delete(long handle, long options, byte[] key); private static native long newIterator(long handle, long readOptions); + private static native long newSnapshot(long handle); + static native void releaseSnapshot(long handle, long snapshotHandle); + + private static native byte[] getTest(long handle, byte[] key) throws IOException; + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f7b647/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Options.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Options.java b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Options.java index a12da61..626e4d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Options.java +++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Options.java @@ -56,6 +56,11 @@ public class Options extends NativeObject { return this; } + public Options blockCacheSize(long capacity) { + blockCacheSize(nativeHandle, capacity); + return this; + } + @Override public void close() { if (nativeHandle != 0) { @@ -70,4 +75,5 @@ public class Options extends NativeObject { private static native void compressionType(long handle, int value); private static native void writeBufferSize(long handle, int value); private static native void blockSize(long handle, int value); + private static native void blockCacheSize(long handle, long capacity); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f7b647/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/ReadOptions.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/ReadOptions.java b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/ReadOptions.java index e97e05f..b2e6726 100644 --- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/ReadOptions.java +++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/ReadOptions.java @@ -22,6 +22,11 @@ public class ReadOptions extends NativeObject { super(construct()); } + public ReadOptions snapshot(Snapshot snapshot) { + snapshot(nativeHandle, snapshot.nativeHandle); + return this; + } + @Override public void close() { if (nativeHandle != 0) { @@ -32,4 +37,5 @@ public class ReadOptions extends NativeObject { private static native long construct(); private static native void destruct(long handle); + private static native void snapshot(long handle, long snapshot); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f7b647/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Snapshot.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Snapshot.java b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Snapshot.java new file mode 100644 index 0000000..ad370b3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Snapshot.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.hdfsdb; + +public class Snapshot extends NativeObject { + private final long dbHandle; + Snapshot(long dbHandle, long nativeHandle) { + super(nativeHandle); + this.dbHandle = dbHandle; + } + + @Override + public void close() throws Exception { + if (nativeHandle != 0) { + DB.releaseSnapshot(dbHandle, nativeHandle); + nativeHandle = 0; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f7b647/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.cc index 1225412..2d0bef2 100644 --- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.cc +++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.cc @@ -1121,6 +1121,52 @@ Status DBImpl::Get(const ReadOptions& options, return s; } +Status DBImpl::SnapshotGet(const ReadOptions& options, + const Slice& key, + const std::function<void(const Slice&)> &get_value) { + Status s; + assert(options.snapshot); + SequenceNumber snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_; + LookupKey lkey(key, snapshot); + + //mutex_.Lock(); + MemTable* mem = mem_; + MemTable* imm = imm_; + //mutex_.Unlock(); + mem->Ref(); + if (imm != NULL) imm->Ref(); + + // First look in the memtable, then in the immutable memtable (if any). + if (mem->Get(lkey, get_value, &s)) { + // Done + } else if (imm != NULL && imm->Get(lkey, get_value, &s)) { + // Done + } else { + assert (false); + mutex_.Lock(); + Version* current = versions_->current(); + current->Ref(); + // Unlock while reading from files and memtables + mutex_.Unlock(); + Version::GetStats stats; + std::string value; + s = current->Get(options, lkey, &value, &stats); + if (value.size()) { + get_value(Slice(value)); + } + mutex_.Lock(); + if (current->UpdateStats(stats)) { + MaybeScheduleCompaction(); + } + current->Unref(); + mutex_.Unlock(); + } + + mem->Unref(); + if (imm != NULL) imm->Unref(); + return s; +} + Iterator* DBImpl::NewIterator(const ReadOptions& options) { SequenceNumber latest_snapshot; uint32_t seed; http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f7b647/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.h b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.h index cfc9981..21a6c5f 100644 --- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.h +++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.h @@ -35,6 +35,10 @@ class DBImpl : public DB { virtual Status Get(const ReadOptions& options, const Slice& key, std::string* value); + virtual Status SnapshotGet(const ReadOptions& options, + const Slice& key, + const std::function<void(const Slice&)> + &get_value); virtual Iterator* NewIterator(const ReadOptions&); virtual const Snapshot* GetSnapshot(); virtual void ReleaseSnapshot(const Snapshot* snapshot); http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f7b647/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_test.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_test.cc index 280b01c..3462fe7 100644 --- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_test.cc @@ -1848,6 +1848,14 @@ class ModelDB: public DB { assert(false); // Not implemented return Status::NotFound(key); } + + virtual Status SnapshotGet(const ReadOptions& options, + const Slice& key, const + std::function<void(const Slice&)> &) { + assert(false); // Not implemented + return Status::NotFound(key); + } + virtual Iterator* NewIterator(const ReadOptions& options) { if (options.snapshot == NULL) { KVMap* saved = new KVMap; http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f7b647/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.cc index bfec0a7..9cabcae 100644 --- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.cc +++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.cc @@ -106,6 +106,12 @@ void MemTable::Add(SequenceNumber s, ValueType type, } bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { + return Get(key, [value](const Slice &v) { + value->assign(v.data(), v.size()); }, s); +} + +bool MemTable::Get(const LookupKey& key, + const std::function<void(const Slice&)> &get_value, Status* s) { Slice memkey = key.memtable_key(); Table::Iterator iter(&table_); iter.Seek(memkey.data()); @@ -130,7 +136,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { switch (static_cast<ValueType>(tag & 0xff)) { case kTypeValue: { Slice v = GetLengthPrefixedSlice(key_ptr + key_length); - value->assign(v.data(), v.size()); + get_value(v); return true; } case kTypeDeletion: http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f7b647/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.h b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.h index 92e90bb..31835bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.h +++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.h @@ -6,6 +6,9 @@ #define STORAGE_LEVELDB_DB_MEMTABLE_H_ #include <string> +#include <functional> +#include <atomic> + #include "leveldb/db.h" #include "db/dbformat.h" #include "db/skiplist.h" @@ -28,9 +31,9 @@ class MemTable { // Drop reference count. Delete if no more references exist. void Unref() { - --refs_; - assert(refs_ >= 0); - if (refs_ <= 0) { + int v = std::atomic_fetch_sub(&refs_, 1); + assert(v >= 0); + if (v <= 0) { delete this; } } @@ -62,6 +65,8 @@ class MemTable { // in *status and return true. // Else, return false. bool Get(const LookupKey& key, std::string* value, Status* s); + bool Get(const LookupKey& key, const std::function<void(const Slice&)> + &get_value, Status* s); private: ~MemTable(); // Private since only Unref() should be used to delete it @@ -77,7 +82,7 @@ class MemTable { typedef SkipList<const char*, KeyComparator> Table; KeyComparator comparator_; - int refs_; + std::atomic_int refs_; Arena arena_; Table table_; http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f7b647/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/include/leveldb/db.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/include/leveldb/db.h b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/include/leveldb/db.h index 40851b2..a81fe1a 100644 --- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/include/leveldb/db.h +++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/include/leveldb/db.h @@ -7,6 +7,8 @@ #include <stdint.h> #include <stdio.h> +#include <functional> + #include "leveldb/iterator.h" #include "leveldb/options.h" @@ -83,6 +85,15 @@ class DB { virtual Status Get(const ReadOptions& options, const Slice& key, std::string* value) = 0; + // Get the value from a particular snapshot. The call only blocks if + // the value resides in the block cache or on the disk. + // + // May return some other Status on an error. + virtual Status SnapshotGet(const ReadOptions& options, + const Slice& key, + const std::function<void(const Slice&)> + &get_value) = 0; + // Return a heap-allocated iterator over the contents of the database. // The result of NewIterator() is initially invalid (caller must // call one of the Seek methods on the iterator before using it). http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f7b647/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/jni/bindings.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/jni/bindings.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/jni/bindings.cc index 33604b8..2cae088 100644 --- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/jni/bindings.cc +++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/jni/bindings.cc @@ -16,7 +16,7 @@ * limitations under the License. */ #include <jni.h> - +#include <mutex> #undef JNIEXPORT #if _WIN32 #define JNIEXPORT __declspec(dllexport) @@ -33,11 +33,12 @@ #include "org_apache_hadoop_hdfs_hdfsdb_WriteOptions.h" #include <leveldb/db.h> +#include <leveldb/cache.h> #include <leveldb/options.h> #include <leveldb/write_batch.h> #include <leveldb/cache.h> -static inline uintptr_t uintptr(void *ptr) { +static inline uintptr_t uintptr(const void *ptr) { return reinterpret_cast<uintptr_t>(ptr); } @@ -130,6 +131,29 @@ jbyteArray JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_get(JNIEnv *env, jclass return ToJByteArray(env, leveldb::Slice(result)); } +jbyteArray JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_snapshotGet(JNIEnv *env, jclass, jlong handle, jlong jread_options, jbyteArray jkey) { + leveldb::DB *db = reinterpret_cast<leveldb::DB*>(handle); + leveldb::ReadOptions *options = reinterpret_cast<leveldb::ReadOptions*>(jread_options); + jbyteArray res = NULL; + leveldb::Status status; + { + JNIByteArrayHolder<GetByteArrayCritical> key(env, jkey); + status = db->SnapshotGet(*options, key.slice(), + [env,&res](const leveldb::Slice &v) { + res = ToJByteArray(env, v); + }); + } + + if (status.IsNotFound()) { + return NULL; + } else if (!status.ok()) { + env->ThrowNew(env->FindClass("java/io/IOException"), status.ToString().c_str()); + return NULL; + } + + return res; +} + void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_write(JNIEnv *env, jclass, jlong handle, jlong jwrite_options, jlong jbatch) { leveldb::DB *db = reinterpret_cast<leveldb::DB*>(handle); leveldb::WriteOptions *options = reinterpret_cast<leveldb::WriteOptions*>(jwrite_options); @@ -150,10 +174,33 @@ void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_delete(JNIEnv *env, jclass, j jlong JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_newIterator(JNIEnv *, jclass, jlong handle, jlong jread_options) { leveldb::DB *db = reinterpret_cast<leveldb::DB*>(handle); leveldb::ReadOptions *options = reinterpret_cast<leveldb::ReadOptions*>(jread_options); - auto res = uintptr(db->NewIterator(*options)); + uintptr_t res = uintptr(db->NewIterator(*options)); return res; } +jlong JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_newSnapshot(JNIEnv *, jclass, jlong handle) { + leveldb::DB *db = reinterpret_cast<leveldb::DB*>(handle); + uintptr_t res = uintptr(db->GetSnapshot()); + return res; +} + +void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_releaseSnapshot(JNIEnv *, jclass, jlong handle, jlong snapshot) { + leveldb::DB *db = reinterpret_cast<leveldb::DB*>(handle); + leveldb::Snapshot *s = reinterpret_cast<leveldb::Snapshot*>(snapshot); + db->ReleaseSnapshot(s); +} + +static std::mutex mutex; +jbyteArray JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_getTest(JNIEnv *env, +jclass, jlong handle, jbyteArray jkey) { + mutex.lock(); + JNIByteArrayHolder<GetByteArrayElements> key(env, jkey); + std::string result; + result.resize(100); + mutex.unlock(); + return ToJByteArray(env, leveldb::Slice(result)); +} + void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_Iterator_destruct(JNIEnv *, jclass, jlong handle) { delete reinterpret_cast<leveldb::Iterator*>(handle); } @@ -212,6 +259,14 @@ void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_Options_blockSize(JNIEnv *, jcla options->block_size = value; } +void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_Options_blockCacheSize(JNIEnv *, jclass, jlong handle, jlong value) { + leveldb::Options *options = reinterpret_cast<leveldb::Options*>(handle); + if (options->block_cache) { + delete options->block_cache; + } + options->block_cache = leveldb::NewLRUCache(value); +} + jlong JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_ReadOptions_construct(JNIEnv *, jclass) { return uintptr(new leveldb::ReadOptions()); } @@ -220,6 +275,11 @@ void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_ReadOptions_destruct(JNIEnv *, j delete reinterpret_cast<leveldb::ReadOptions*>(handle); } +void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_ReadOptions_snapshot(JNIEnv *, jclass, jlong handle, jlong snapshot) { + leveldb::ReadOptions *o = reinterpret_cast<leveldb::ReadOptions*>(handle); + o->snapshot = reinterpret_cast<leveldb::Snapshot*>(snapshot); +} + jlong JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_WriteOptions_construct(JNIEnv *, jclass) { return uintptr(new leveldb::WriteOptions()); }