This is an automated email from the ASF dual-hosted git repository. jin pushed a commit to branch clean-rocksdb in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
commit e7c7558acdc8a27167dc5f2f36168a3ef4423b53 Author: imbajin <[email protected]> AuthorDate: Mon Apr 17 19:23:01 2023 +0800 refact(rocksdb): clean & reformat some code also mark TODO in some gists --- .../backend/store/rocksdb/OpenedRocksDB.java | 20 +- .../backend/store/rocksdb/RocksDBIngester.java | 14 +- .../backend/store/rocksdb/RocksDBIteratorPool.java | 10 +- .../backend/store/rocksdb/RocksDBMetrics.java | 1 + .../backend/store/rocksdb/RocksDBOptions.java | 10 +- .../backend/store/rocksdb/RocksDBSessions.java | 3 +- .../backend/store/rocksdb/RocksDBStdSessions.java | 203 ++++++++------------- .../backend/store/rocksdb/RocksDBStore.java | 8 +- .../backend/store/rocksdb/RocksDBTable.java | 6 +- .../backend/store/rocksdb/RocksDBTables.java | 1 + .../store/rocksdbsst/RocksDBSstSessions.java | 47 +++-- .../backend/store/rocksdbsst/RocksDBSstStore.java | 15 +- 12 files changed, 138 insertions(+), 200 deletions(-) diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/OpenedRocksDB.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/OpenedRocksDB.java index 3ae6ba3fe..9b1017226 100644 --- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/OpenedRocksDB.java +++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/OpenedRocksDB.java @@ -27,17 +27,16 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.FileUtils; +import org.apache.hugegraph.backend.BackendException; +import org.apache.hugegraph.backend.store.rocksdb.RocksDBIteratorPool.ReusedRocksIterator; +import org.apache.hugegraph.util.E; +import org.apache.hugegraph.util.Log; import org.rocksdb.Checkpoint; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDB; import org.rocksdb.SstFileManager; import org.slf4j.Logger; -import org.apache.hugegraph.backend.BackendException; -import org.apache.hugegraph.backend.store.rocksdb.RocksDBIteratorPool.ReusedRocksIterator; -import org.apache.hugegraph.util.E; -import org.apache.hugegraph.util.Log; - public class OpenedRocksDB implements AutoCloseable { private static final Logger LOG = Log.logger(OpenedRocksDB.class); @@ -118,8 +117,7 @@ public class OpenedRocksDB implements AutoCloseable { tempFile, snapshotFile)); } } catch (Exception e) { - throw new BackendException("Failed to create checkpoint at path %s", - e, targetPath); + throw new BackendException("Failed to create checkpoint at path %s", e, targetPath); } } @@ -137,8 +135,7 @@ public class OpenedRocksDB implements AutoCloseable { } public synchronized ColumnFamilyHandle get() { - E.checkState(this.handle.isOwningHandle(), - "It seems CF has been closed"); + E.checkState(this.handle.isOwningHandle(), "It seems CF has been closed"); assert this.refs.get() >= 1; return this.handle; } @@ -163,7 +160,7 @@ public class OpenedRocksDB implements AutoCloseable { public synchronized ColumnFamilyHandle waitForDrop() { assert this.refs.get() >= 1; - // When entering this method, the refs won't increase any more + // When entering this method, the refs won't increase anymore final long timeout = TimeUnit.MINUTES.toMillis(30L); final long unit = 100L; for (long i = 1; this.refs.get() > 1; i++) { @@ -173,8 +170,7 @@ public class OpenedRocksDB implements AutoCloseable { // 30s rest api timeout may cause InterruptedException } if (i * unit > timeout) { - throw new BackendException("Timeout after %sms to drop CF", - timeout); + throw new BackendException("Timeout after %sms to drop CF", timeout); } } assert this.refs.get() == 1; diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBIngester.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBIngester.java index ab89e19ef..fa30a389b 100644 --- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBIngester.java +++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBIngester.java @@ -27,15 +27,14 @@ import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.List; +import org.apache.hugegraph.backend.BackendException; +import org.apache.hugegraph.util.Log; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.IngestExternalFileOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.slf4j.Logger; -import org.apache.hugegraph.backend.BackendException; -import org.apache.hugegraph.util.Log; - public class RocksDBIngester { public static final String SST = ".sst"; @@ -52,8 +51,7 @@ public class RocksDBIngester { this.options.setMoveFiles(true); } - public List<String> ingest(Path path, ColumnFamilyHandle cf) - throws RocksDBException { + public List<String> ingest(Path path, ColumnFamilyHandle cf) throws RocksDBException { SuffixFileVisitor visitor = new SuffixFileVisitor(SST); try { Files.walkFileTree(path, visitor); @@ -74,10 +72,8 @@ public class RocksDBIngester { return ssts; } - public void ingest(ColumnFamilyHandle cf, List<String> ssts) - throws RocksDBException { - LOG.info("Ingest sst files to CF '{}': {}", - RocksDBStdSessions.decode(cf.getName()), ssts); + public void ingest(ColumnFamilyHandle cf, List<String> ssts) throws RocksDBException { + LOG.info("Ingest sst files to CF '{}': {}", RocksDBStdSessions.decode(cf.getName()), ssts); if (!ssts.isEmpty()) { this.rocksdb.ingestExternalFile(cf, ssts, this.options); } diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBIteratorPool.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBIteratorPool.java index 7aad1407e..eb52b6296 100644 --- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBIteratorPool.java +++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBIteratorPool.java @@ -20,17 +20,16 @@ package org.apache.hugegraph.backend.store.rocksdb; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; +import org.apache.hugegraph.backend.BackendException; +import org.apache.hugegraph.config.CoreOptions; +import org.apache.hugegraph.util.Log; +import org.apache.hugegraph.util.StringEncoding; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; import org.slf4j.Logger; -import org.apache.hugegraph.backend.BackendException; -import org.apache.hugegraph.config.CoreOptions; -import org.apache.hugegraph.util.Log; -import org.apache.hugegraph.util.StringEncoding; - public final class RocksDBIteratorPool implements AutoCloseable { private static final Logger LOG = Log.logger(RocksDBIteratorPool.class); @@ -149,6 +148,7 @@ public final class RocksDBIteratorPool implements AutoCloseable { protected final class ReusedRocksIterator { + // TODO: is the typo "EREUSING_ENABLED" right? or should be "REUSING_ENABLED"? private static final boolean EREUSING_ENABLED = false; private final RocksIterator iterator; private boolean closed; diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBMetrics.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBMetrics.java index 61462d6f8..6719545ba 100644 --- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBMetrics.java +++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBMetrics.java @@ -24,6 +24,7 @@ import org.apache.hugegraph.backend.store.BackendMetrics; import org.apache.hugegraph.util.Bytes; import org.apache.hugegraph.util.InsertionOrderUtil; import org.apache.hugegraph.util.UnitUtil; + import com.google.common.collect.ImmutableMap; public class RocksDBMetrics implements BackendMetrics { diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBOptions.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBOptions.java index a696b6cc3..9dc4b63a5 100644 --- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBOptions.java +++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBOptions.java @@ -23,17 +23,17 @@ import static org.apache.hugegraph.config.OptionChecker.inValues; import static org.apache.hugegraph.config.OptionChecker.rangeDouble; import static org.apache.hugegraph.config.OptionChecker.rangeInt; -import org.rocksdb.CompactionStyle; -import org.rocksdb.CompressionType; -import org.rocksdb.DataBlockIndexType; -import org.rocksdb.IndexType; - import org.apache.hugegraph.config.ConfigConvOption; import org.apache.hugegraph.config.ConfigListConvOption; import org.apache.hugegraph.config.ConfigListOption; import org.apache.hugegraph.config.ConfigOption; import org.apache.hugegraph.config.OptionHolder; import org.apache.hugegraph.util.Bytes; +import org.rocksdb.CompactionStyle; +import org.rocksdb.CompressionType; +import org.rocksdb.DataBlockIndexType; +import org.rocksdb.IndexType; + import com.google.common.collect.ImmutableList; public class RocksDBOptions extends OptionHolder { diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBSessions.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBSessions.java index 8614d6b73..fa40be8ea 100644 --- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBSessions.java +++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBSessions.java @@ -21,12 +21,11 @@ import java.util.List; import java.util.Set; import org.apache.commons.lang3.tuple.Pair; -import org.rocksdb.RocksDBException; - import org.apache.hugegraph.backend.store.BackendEntry.BackendColumnIterator; import org.apache.hugegraph.backend.store.BackendSession.AbstractBackendSession; import org.apache.hugegraph.backend.store.BackendSessionPool; import org.apache.hugegraph.config.HugeConfig; +import org.rocksdb.RocksDBException; public abstract class RocksDBSessions extends BackendSessionPool { diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java index 71a66906d..fd837906c 100644 --- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java +++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java @@ -32,6 +32,18 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.hugegraph.backend.BackendException; +import org.apache.hugegraph.backend.serializer.BinarySerializer; +import org.apache.hugegraph.backend.store.BackendEntry.BackendColumn; +import org.apache.hugegraph.backend.store.BackendEntry.BackendColumnIterator; +import org.apache.hugegraph.backend.store.BackendEntryIterator; +import org.apache.hugegraph.backend.store.rocksdb.RocksDBIteratorPool.ReusedRocksIterator; +import org.apache.hugegraph.config.CoreOptions; +import org.apache.hugegraph.config.HugeConfig; +import org.apache.hugegraph.util.Bytes; +import org.apache.hugegraph.util.E; +import org.apache.hugegraph.util.Log; +import org.apache.hugegraph.util.StringEncoding; import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.BloomFilter; import org.rocksdb.ColumnFamilyDescriptor; @@ -57,18 +69,6 @@ import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; import org.slf4j.Logger; -import org.apache.hugegraph.backend.BackendException; -import org.apache.hugegraph.backend.serializer.BinarySerializer; -import org.apache.hugegraph.backend.store.BackendEntry.BackendColumn; -import org.apache.hugegraph.backend.store.BackendEntry.BackendColumnIterator; -import org.apache.hugegraph.backend.store.BackendEntryIterator; -import org.apache.hugegraph.backend.store.rocksdb.RocksDBIteratorPool.ReusedRocksIterator; -import org.apache.hugegraph.config.CoreOptions; -import org.apache.hugegraph.config.HugeConfig; -import org.apache.hugegraph.util.Bytes; -import org.apache.hugegraph.util.E; -import org.apache.hugegraph.util.Log; -import org.apache.hugegraph.util.StringEncoding; import com.google.common.collect.ImmutableList; public class RocksDBStdSessions extends RocksDBSessions { @@ -166,8 +166,7 @@ public class RocksDBStdSessions extends RocksDBSessions { } @Override - public synchronized void dropTable(String... tables) - throws RocksDBException { + public synchronized void dropTable(String... tables) throws RocksDBException { this.checkValid(); /* @@ -210,10 +209,8 @@ public class RocksDBStdSessions extends RocksDBSessions { if (this.rocksdb.isOwningHandle()) { this.rocksdb.close(); } - this.rocksdb = RocksDBStdSessions.openRocksDB(this.config, - ImmutableList.of(), - this.dataPath, - this.walPath); + this.rocksdb = RocksDBStdSessions.openRocksDB(this.config, ImmutableList.of(), + this.dataPath, this.walPath); } @Override @@ -252,8 +249,7 @@ public class RocksDBStdSessions extends RocksDBSessions { } @Override - public RocksDBSessions copy(HugeConfig config, - String database, String store) { + public RocksDBSessions copy(HugeConfig config, String database, String store) { return new RocksDBStdSessions(config, database, store, this); } @@ -299,24 +295,20 @@ public class RocksDBStdSessions extends RocksDBSessions { // Like: rocksdb-data/* Path pureDataPath = parentParentPath.relativize(originDataPath); // Like: parent_path/snapshot_rocksdb-data/* - Path snapshotPath = parentParentPath.resolve(snapshotPrefix + "_" + - pureDataPath); + Path snapshotPath = parentParentPath.resolve(snapshotPrefix + "_" + pureDataPath); E.checkArgument(snapshotPath.toFile().exists(), - "The snapshot path '%s' doesn't exist", - snapshotPath); + "The snapshot path '%s' doesn't exist", snapshotPath); return snapshotPath.toString(); } @Override public String hardLinkSnapshot(String snapshotPath) throws RocksDBException { String snapshotLinkPath = this.dataPath + "_temp"; - try (OpenedRocksDB rocksdb = openRocksDB(this.config, - ImmutableList.of(), + try (OpenedRocksDB rocksdb = openRocksDB(this.config, ImmutableList.of(), snapshotPath, null)) { rocksdb.createCheckpoint(snapshotLinkPath); } - LOG.info("The snapshot {} has been hard linked to {}", - snapshotPath, snapshotLinkPath); + LOG.info("The snapshot {} has been hard linked to {}", snapshotPath, snapshotLinkPath); return snapshotLinkPath; } @@ -327,8 +319,7 @@ public class RocksDBStdSessions extends RocksDBSessions { @Override protected final Session newSession() { - E.checkState(this.rocksdb.isOwningHandle(), - "RocksDB has not been initialized"); + E.checkState(this.rocksdb.isOwningHandle(), "RocksDB has not been initialized"); return new StdSession(this.config()); } @@ -344,8 +335,7 @@ public class RocksDBStdSessions extends RocksDBSessions { } private void checkValid() { - E.checkState(this.rocksdb.isOwningHandle(), - "It seems RocksDB has been closed"); + E.checkState(this.rocksdb.isOwningHandle(), "It seems RocksDB has been closed"); } private RocksDB rocksdb() { @@ -379,9 +369,8 @@ public class RocksDBStdSessions extends RocksDBSessions { } } - private static OpenedRocksDB openRocksDB(HugeConfig config, - String dataPath, String walPath) - throws RocksDBException { + private static OpenedRocksDB openRocksDB(HugeConfig config, String dataPath, + String walPath) throws RocksDBException { // Init options Options options = new Options(); RocksDBStdSessions.initOptions(config, options, options, @@ -399,9 +388,8 @@ public class RocksDBStdSessions extends RocksDBSessions { } private static OpenedRocksDB openRocksDB(HugeConfig config, - List<String> cfNames, - String dataPath, String walPath) - throws RocksDBException { + List<String> cfNames, String dataPath, + String walPath) throws RocksDBException { // Old CFs should always be opened Set<String> mergedCFs = RocksDBStdSessions.mergeOldCFs(dataPath, cfNames); @@ -412,8 +400,7 @@ public class RocksDBStdSessions extends RocksDBSessions { for (String cf : cfs) { ColumnFamilyDescriptor cfd = new ColumnFamilyDescriptor(encode(cf)); ColumnFamilyOptions options = cfd.getOptions(); - RocksDBStdSessions.initOptions(config, null, null, - options, options); + RocksDBStdSessions.initOptions(config, null, null, options, options); cfds.add(cfd); } @@ -440,8 +427,8 @@ public class RocksDBStdSessions extends RocksDBSessions { return new OpenedRocksDB(rocksdb, cfHandles, sstFileManager); } - private static Set<String> mergeOldCFs(String path, List<String> cfNames) - throws RocksDBException { + private static Set<String> mergeOldCFs(String path, + List<String> cfNames) throws RocksDBException { Set<String> cfs = listCFs(path); cfs.addAll(cfNames); return cfs; @@ -535,8 +522,7 @@ public class RocksDBStdSessions extends RocksDBSessions { */ mdb.setMaxBackgroundJobs(conf.get(RocksDBOptions.MAX_BG_JOBS)); - mdb.setDelayedWriteRate( - conf.get(RocksDBOptions.DELAYED_WRITE_RATE)); + mdb.setDelayedWriteRate(conf.get(RocksDBOptions.DELAYED_WRITE_RATE)); mdb.setMaxOpenFiles(conf.get(RocksDBOptions.MAX_OPEN_FILES)); @@ -544,14 +530,12 @@ public class RocksDBStdSessions extends RocksDBSessions { mdb.setBytesPerSync(conf.get(RocksDBOptions.BYTES_PER_SYNC)); mdb.setWalBytesPerSync(conf.get(RocksDBOptions.WAL_BYTES_PER_SYNC)); - mdb.setStrictBytesPerSync( - conf.get(RocksDBOptions.STRICT_BYTES_PER_SYNC)); + mdb.setStrictBytesPerSync(conf.get(RocksDBOptions.STRICT_BYTES_PER_SYNC)); - mdb.setCompactionReadaheadSize( - conf.get(RocksDBOptions.COMPACTION_READAHEAD_SIZE)); + mdb.setCompactionReadaheadSize(conf.get(RocksDBOptions.COMPACTION_READAHEAD_SIZE)); - mdb.setDeleteObsoleteFilesPeriodMicros(1000000 * - conf.get(RocksDBOptions.DELETE_OBSOLETE_FILE_PERIOD)); + mdb.setDeleteObsoleteFilesPeriodMicros( + 1000000 * conf.get(RocksDBOptions.DELETE_OBSOLETE_FILE_PERIOD)); } if (cf != null) { @@ -562,38 +546,30 @@ public class RocksDBStdSessions extends RocksDBSessions { } int numLevels = conf.get(RocksDBOptions.NUM_LEVELS); - List<CompressionType> compressions = conf.get( - RocksDBOptions.LEVELS_COMPRESSIONS); - E.checkArgument(compressions.isEmpty() || - compressions.size() == numLevels, + List<CompressionType> compressions = conf.get(RocksDBOptions.LEVELS_COMPRESSIONS); + E.checkArgument(compressions.isEmpty() || compressions.size() == numLevels, "Elements number of '%s' must be 0 or " + "be the same as '%s', but got %s != %s", RocksDBOptions.LEVELS_COMPRESSIONS.name(), - RocksDBOptions.NUM_LEVELS.name(), - compressions.size(), numLevels); + RocksDBOptions.NUM_LEVELS.name(), compressions.size(), numLevels); cf.setNumLevels(numLevels); cf.setCompactionStyle(conf.get(RocksDBOptions.COMPACTION_STYLE)); - cf.setBottommostCompressionType( - conf.get(RocksDBOptions.BOTTOMMOST_COMPRESSION)); + cf.setBottommostCompressionType(conf.get(RocksDBOptions.BOTTOMMOST_COMPRESSION)); if (!compressions.isEmpty()) { cf.setCompressionPerLevel(compressions); } - cf.setMinWriteBufferNumberToMerge( - conf.get(RocksDBOptions.MIN_MEMTABLES_TO_MERGE)); + cf.setMinWriteBufferNumberToMerge(conf.get(RocksDBOptions.MIN_MEMTABLES_TO_MERGE)); cf.setMaxWriteBufferNumberToMaintain( - conf.get(RocksDBOptions.MAX_MEMTABLES_TO_MAINTAIN)); + conf.get(RocksDBOptions.MAX_MEMTABLES_TO_MAINTAIN)); - cf.setInplaceUpdateSupport( - conf.get(RocksDBOptions.MEMTABLE_INPLACE_UPDATE_SUPPORT)); + cf.setInplaceUpdateSupport(conf.get(RocksDBOptions.MEMTABLE_INPLACE_UPDATE_SUPPORT)); - cf.setLevelCompactionDynamicLevelBytes( - conf.get(RocksDBOptions.DYNAMIC_LEVEL_BYTES)); + cf.setLevelCompactionDynamicLevelBytes(conf.get(RocksDBOptions.DYNAMIC_LEVEL_BYTES)); - cf.setOptimizeFiltersForHits( - conf.get(RocksDBOptions.BLOOM_FILTERS_SKIP_LAST_LEVEL)); + cf.setOptimizeFiltersForHits(conf.get(RocksDBOptions.BLOOM_FILTERS_SKIP_LAST_LEVEL)); cf.setTableFormatConfig(initTableConfig(conf)); @@ -613,27 +589,22 @@ public class RocksDBStdSessions extends RocksDBSessions { mcf.setWriteBufferSize(conf.get(RocksDBOptions.MEMTABLE_SIZE)); mcf.setMaxWriteBufferNumber(conf.get(RocksDBOptions.MAX_MEMTABLES)); - mcf.setMaxBytesForLevelBase( - conf.get(RocksDBOptions.MAX_LEVEL1_BYTES)); - mcf.setMaxBytesForLevelMultiplier( - conf.get(RocksDBOptions.MAX_LEVEL_BYTES_MULTIPLIER)); + mcf.setMaxBytesForLevelBase(conf.get(RocksDBOptions.MAX_LEVEL1_BYTES)); + mcf.setMaxBytesForLevelMultiplier(conf.get(RocksDBOptions.MAX_LEVEL_BYTES_MULTIPLIER)); - mcf.setTargetFileSizeBase( - conf.get(RocksDBOptions.TARGET_FILE_SIZE_BASE)); - mcf.setTargetFileSizeMultiplier( - conf.get(RocksDBOptions.TARGET_FILE_SIZE_MULTIPLIER)); + mcf.setTargetFileSizeBase(conf.get(RocksDBOptions.TARGET_FILE_SIZE_BASE)); + mcf.setTargetFileSizeMultiplier(conf.get(RocksDBOptions.TARGET_FILE_SIZE_MULTIPLIER)); mcf.setLevel0FileNumCompactionTrigger( - conf.get(RocksDBOptions.LEVEL0_COMPACTION_TRIGGER)); + conf.get(RocksDBOptions.LEVEL0_COMPACTION_TRIGGER)); mcf.setLevel0SlowdownWritesTrigger( - conf.get(RocksDBOptions.LEVEL0_SLOWDOWN_WRITES_TRIGGER)); - mcf.setLevel0StopWritesTrigger( - conf.get(RocksDBOptions.LEVEL0_STOP_WRITES_TRIGGER)); + conf.get(RocksDBOptions.LEVEL0_SLOWDOWN_WRITES_TRIGGER)); + mcf.setLevel0StopWritesTrigger(conf.get(RocksDBOptions.LEVEL0_STOP_WRITES_TRIGGER)); mcf.setSoftPendingCompactionBytesLimit( - conf.get(RocksDBOptions.SOFT_PENDING_COMPACTION_LIMIT)); + conf.get(RocksDBOptions.SOFT_PENDING_COMPACTION_LIMIT)); mcf.setHardPendingCompactionBytesLimit( - conf.get(RocksDBOptions.HARD_PENDING_COMPACTION_LIMIT)); + conf.get(RocksDBOptions.HARD_PENDING_COMPACTION_LIMIT)); /* * TODO: also set memtable options: @@ -643,11 +614,10 @@ public class RocksDBStdSessions extends RocksDBSessions { * #diff-cde52d1fcbcce2bc6aae27838f1d3e7e9e469ccad8aaf8f2695f939e279d7501R369 */ mcf.setMemtablePrefixBloomSizeRatio( - conf.get(RocksDBOptions.MEMTABLE_BLOOM_SIZE_RATIO)); + conf.get(RocksDBOptions.MEMTABLE_BLOOM_SIZE_RATIO)); mcf.setMemtableWholeKeyFiltering( - conf.get(RocksDBOptions.MEMTABLE_BLOOM_WHOLE_KEY_FILTERING)); - mcf.setMemtableHugePageSize( - conf.get(RocksDBOptions.MEMTABL_BLOOM_HUGE_PAGE_SIZE)); + conf.get(RocksDBOptions.MEMTABLE_BLOOM_WHOLE_KEY_FILTERING)); + mcf.setMemtableHugePageSize(conf.get(RocksDBOptions.MEMTABL_BLOOM_HUGE_PAGE_SIZE)); boolean bulkload = conf.get(RocksDBOptions.BULKLOAD_MODE); if (bulkload) { @@ -671,8 +641,7 @@ public class RocksDBStdSessions extends RocksDBSessions { public static TableFormatConfig initTableConfig(HugeConfig conf) { BlockBasedTableConfig tableConfig = new BlockBasedTableConfig(); - tableConfig.setFormatVersion( - conf.get(RocksDBOptions.TABLE_FORMAT_VERSION)); + tableConfig.setFormatVersion(conf.get(RocksDBOptions.TABLE_FORMAT_VERSION)); /* * The index type used to lookup between data blocks: @@ -689,17 +658,14 @@ public class RocksDBStdSessions extends RocksDBSessions { * The search type of point lookup can be BinarySearch or HashSearch: * https://github.com/facebook/rocksdb/wiki/Data-Block-Hash-Index */ - tableConfig.setDataBlockIndexType( - conf.get(RocksDBOptions.DATA_BLOCK_SEARCH_TYPE)); + tableConfig.setDataBlockIndexType(conf.get(RocksDBOptions.DATA_BLOCK_SEARCH_TYPE)); tableConfig.setDataBlockHashTableUtilRatio( - conf.get(RocksDBOptions.DATA_BLOCK_HASH_TABLE_RATIO)); + conf.get(RocksDBOptions.DATA_BLOCK_HASH_TABLE_RATIO)); long blockSize = conf.get(RocksDBOptions.BLOCK_SIZE); tableConfig.setBlockSize(blockSize); - tableConfig.setBlockSizeDeviation( - conf.get(RocksDBOptions.BLOCK_SIZE_DEVIATION)); - tableConfig.setBlockRestartInterval( - conf.get(RocksDBOptions.BLOCK_RESTART_INTERVAL)); + tableConfig.setBlockSizeDeviation(conf.get(RocksDBOptions.BLOCK_SIZE_DEVIATION)); + tableConfig.setBlockRestartInterval(conf.get(RocksDBOptions.BLOCK_RESTART_INTERVAL)); // https://github.com/facebook/rocksdb/wiki/Block-Cache long cacheCapacity = conf.get(RocksDBOptions.BLOCK_CACHE_CAPACITY); @@ -715,16 +681,14 @@ public class RocksDBStdSessions extends RocksDBSessions { if (bitsPerKey >= 0) { // TODO: use space-saving RibbonFilterPolicy boolean blockBased = conf.get(RocksDBOptions.BLOOM_FILTER_MODE); - tableConfig.setFilterPolicy(new BloomFilter(bitsPerKey, - blockBased)); + tableConfig.setFilterPolicy(new BloomFilter(bitsPerKey, blockBased)); - tableConfig.setWholeKeyFiltering( - conf.get(RocksDBOptions.BLOOM_FILTER_WHOLE_KEY)); + tableConfig.setWholeKeyFiltering(conf.get(RocksDBOptions.BLOOM_FILTER_WHOLE_KEY)); tableConfig.setCacheIndexAndFilterBlocks( - conf.get(RocksDBOptions.CACHE_FILTER_AND_INDEX)); + conf.get(RocksDBOptions.CACHE_FILTER_AND_INDEX)); tableConfig.setPinL0FilterAndIndexBlocksInCache( - conf.get(RocksDBOptions.PIN_L0_INDEX_AND_FILTER)); + conf.get(RocksDBOptions.PIN_L0_INDEX_AND_FILTER)); // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters if (conf.get(RocksDBOptions.PARTITION_FILTERS_INDEXES)) { @@ -734,7 +698,7 @@ public class RocksDBStdSessions extends RocksDBSessions { .setMetadataBlockSize(blockSize) .setCacheIndexAndFilterBlocksWithHighPriority(true); tableConfig.setPinTopLevelIndexAndFilter( - conf.get(RocksDBOptions.PIN_TOP_INDEX_AND_FILTER)); + conf.get(RocksDBOptions.PIN_TOP_INDEX_AND_FILTER)); } } @@ -898,7 +862,7 @@ public class RocksDBStdSessions extends RocksDBSessions { /** * Merge a record to an existing key to a table * For more details about merge-operator: - * https://github.com/facebook/rocksdb/wiki/merge-operator + * <a href="https://github.com/facebook/rocksdb/wiki/merge-operator">...</a> */ @Override public void merge(String table, byte[] key, byte[] value) { @@ -950,8 +914,7 @@ public class RocksDBStdSessions extends RocksDBSessions { * Delete a record by key(or prefix with key) from a table */ @Override - public void deletePrefix(String table, byte[] key) { - byte[] keyFrom = key; + public void deletePrefix(String table, byte[] keyFrom) { byte[] keyTo = Arrays.copyOf(keyFrom, keyFrom.length); BinarySerializer.increaseOne(keyTo); try (OpenedRocksDB.CFHandle cf = cf(table)) { @@ -1044,8 +1007,7 @@ public class RocksDBStdSessions extends RocksDBSessions { */ try (OpenedRocksDB.CFHandle cf = cf(table)) { ReusedRocksIterator iter = cf.newIterator(); - return new ScanIterator(table, iter, prefix, null, - SCAN_PREFIX_BEGIN); + return new ScanIterator(table, iter, prefix, null, SCAN_PREFIX_BEGIN); } } @@ -1076,8 +1038,7 @@ public class RocksDBStdSessions extends RocksDBSessions { /** * A wrapper for RocksIterator that convert RocksDB results to std Iterator */ - private static class ScanIterator implements BackendColumnIterator, - Countable { + private static class ScanIterator implements BackendColumnIterator, Countable { private final String table; private final ReusedRocksIterator reusedIter; @@ -1164,14 +1125,12 @@ public class RocksDBStdSessions extends RocksDBSessions { @SuppressWarnings("unused") private void dump() { this.seek(); - LOG.info(">>>> scan from {}: {}{}", - this.table, - this.keyBegin == null ? "*" : StringEncoding.format(this.keyBegin), - this.iter.isValid() ? "" : " - No data"); + LOG.info(">>>> scan from {}: {}{}", this.table, + this.keyBegin == null ? "*" : StringEncoding.format(this.keyBegin), + this.iter.isValid() ? "" : " - No data"); for (; this.iter.isValid(); this.iter.next()) { - LOG.info("{}={}", - StringEncoding.format(this.iter.key()), - StringEncoding.format(this.iter.value())); + LOG.info("{}={}", StringEncoding.format(this.iter.key()), + StringEncoding.format(this.iter.value())); } } @@ -1202,7 +1161,7 @@ public class RocksDBStdSessions extends RocksDBSessions { } private void seek() { - if (this.keyBegin == null || this.keyBegin.length <= 0) { + if (this.keyBegin == null || this.keyBegin.length == 0) { // Seek to the first if no `keyBegin` this.iter.seekToFirst(); } else { @@ -1216,8 +1175,7 @@ public class RocksDBStdSessions extends RocksDBSessions { // Skip `keyBegin` if set SCAN_GT_BEGIN (key > 'xx') if (this.match(Session.SCAN_GT_BEGIN) && !this.match(Session.SCAN_GTE_BEGIN)) { - while (this.iter.isValid() && - Bytes.equals(this.iter.key(), this.keyBegin)) { + while (this.iter.isValid() && Bytes.equals(this.iter.key(), this.keyBegin)) { this.iter.next(); } } @@ -1254,10 +1212,8 @@ public class RocksDBStdSessions extends RocksDBSessions { return Bytes.compare(key, this.keyEnd) < 0; } } else { - assert this.match(Session.SCAN_ANY) || - this.match(Session.SCAN_GT_BEGIN) || - this.match(Session.SCAN_GTE_BEGIN) : - "Unknow scan type"; + assert this.match(Session.SCAN_ANY) || this.match(Session.SCAN_GT_BEGIN) || + this.match(Session.SCAN_GTE_BEGIN) : "Unknown scan type"; return true; } } @@ -1270,8 +1226,7 @@ public class RocksDBStdSessions extends RocksDBSessions { } } - BackendColumn col = BackendColumn.of(this.iter.key(), - this.iter.value()); + BackendColumn col = BackendColumn.of(this.iter.key(), this.iter.value()); this.iter.next(); this.matched = false; diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java index 4158c7d83..2a826f2b8 100644 --- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java +++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java @@ -44,9 +44,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; -import org.rocksdb.RocksDBException; -import org.slf4j.Logger; - import org.apache.hugegraph.HugeException; import org.apache.hugegraph.backend.BackendException; import org.apache.hugegraph.backend.id.Id; @@ -69,6 +66,9 @@ import org.apache.hugegraph.util.E; import org.apache.hugegraph.util.ExecutorUtil; import org.apache.hugegraph.util.InsertionOrderUtil; import org.apache.hugegraph.util.Log; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; + import com.google.common.collect.ImmutableList; public abstract class RocksDBStore extends AbstractBackendStore<RocksDBSessions.Session> { @@ -725,7 +725,7 @@ public abstract class RocksDBStore extends AbstractBackendStore<RocksDBSessions. readLock.lock(); try { Map<String, String> uniqueSnapshotDirMaps = new HashMap<>(); - // Every rocksdb instance should create an snapshot + // Every rocksdb instance should create a snapshot for (Map.Entry<String, RocksDBSessions> entry : this.dbs.entrySet()) { // Like: parent_path/rocksdb-data/*, * maybe g,m,s Path originDataPath = Paths.get(entry.getKey()).toAbsolutePath(); diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBTable.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBTable.java index 7a5af5f1a..420901591 100644 --- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBTable.java +++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBTable.java @@ -25,9 +25,6 @@ import java.util.List; import java.util.Set; import org.apache.commons.lang3.tuple.Pair; -import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; -import org.slf4j.Logger; - import org.apache.hugegraph.backend.id.Id; import org.apache.hugegraph.backend.page.PageState; import org.apache.hugegraph.backend.query.Aggregate; @@ -52,6 +49,8 @@ import org.apache.hugegraph.util.Bytes; import org.apache.hugegraph.util.E; import org.apache.hugegraph.util.Log; import org.apache.hugegraph.util.StringEncoding; +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; +import org.slf4j.Logger; public class RocksDBTable extends BackendTable<RocksDBSessions.Session, BackendEntry> { @@ -359,6 +358,7 @@ public class RocksDBTable extends BackendTable<RocksDBSessions.Session, BackendE @Override public byte[] position(String position) { + // TODO: START & END is same & be empty now? remove one? if (START.equals(position) || END.equals(position)) { return null; } diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBTables.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBTables.java index 06c2d91a1..6b1caab94 100644 --- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBTables.java +++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBTables.java @@ -218,6 +218,7 @@ public class RocksDBTables { } @Override + // TODO: why this method is same as super.eliminate() in RocksDBTable, del it? public void eliminate(RocksDBSessions.Session session, BackendEntry entry) { assert entry.columns().size() == 1; super.delete(session, entry); diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java index 393cb2ef1..b0cb2cf75 100644 --- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java +++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java @@ -31,11 +31,6 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.Pair; -import org.rocksdb.EnvOptions; -import org.rocksdb.Options; -import org.rocksdb.RocksDBException; -import org.rocksdb.SstFileWriter; - import org.apache.hugegraph.backend.BackendException; import org.apache.hugegraph.backend.store.BackendEntry.BackendColumnIterator; import org.apache.hugegraph.backend.store.rocksdb.RocksDBIngester; @@ -44,14 +39,17 @@ import org.apache.hugegraph.backend.store.rocksdb.RocksDBStdSessions; import org.apache.hugegraph.config.HugeConfig; import org.apache.hugegraph.exception.NotSupportException; import org.apache.hugegraph.util.E; +import org.rocksdb.EnvOptions; +import org.rocksdb.Options; +import org.rocksdb.RocksDBException; +import org.rocksdb.SstFileWriter; public class RocksDBSstSessions extends RocksDBSessions { private final String dataPath; private final Map<String, SstFileWriter> tables; - public RocksDBSstSessions(HugeConfig config, String database, String store, - String dataPath) { + public RocksDBSstSessions(HugeConfig config, String database, String store, String dataPath) { super(config, database, store); this.dataPath = dataPath; @@ -63,8 +61,7 @@ public class RocksDBSstSessions extends RocksDBSessions { } } - public RocksDBSstSessions(HugeConfig config, String dataPath, - String database, String store, + public RocksDBSstSessions(HugeConfig config, String dataPath, String database, String store, List<String> tableNames) throws RocksDBException { this(config, dataPath, database, store); for (String table : tableNames) { @@ -126,17 +123,17 @@ public class RocksDBSstSessions extends RocksDBSessions { } @Override - public synchronized void dropTable(String... tables) - throws RocksDBException { + public synchronized void dropTable(String... tables) { for (String table : tables) { this.dropTable(table); } } - public void dropTable(String table) throws RocksDBException { - SstFileWriter sst = this.tables.remove(table); - assert sst == null || !sst.isOwningHandle() : - "Please close table before drop to ensure call sst.finish()"; + public void dropTable(String table) { + try (SstFileWriter sst = this.tables.remove(table)) { + assert sst == null || !sst.isOwningHandle() : "Please close table before drop to " + + "ensure call sst.finish()"; + } } @Override @@ -176,8 +173,7 @@ public class RocksDBSstSessions extends RocksDBSessions { } @Override - public String hardLinkSnapshot(String snapshotPath) - throws RocksDBException { + public String hardLinkSnapshot(String snapshotPath) { throw new UnsupportedOperationException("hardLinkSnapshot"); } @@ -264,7 +260,7 @@ public class RocksDBSstSessions extends RocksDBSessions { @Override public Integer commit() { int count = this.batch.size(); - if (count <= 0) { + if (count == 0) { return 0; } @@ -277,9 +273,10 @@ public class RocksDBSstSessions extends RocksDBSessions { } // TODO: limit individual SST file size - SstFileWriter sst = table(table.getKey()); - for (Pair<byte[], byte[]> change : table.getValue()) { - sst.put(change.getKey(), change.getValue()); + try (SstFileWriter sst = table(table.getKey())) { + for (Pair<byte[], byte[]> change : table.getValue()) { + sst.put(change.getKey(), change.getValue()); + } } } } catch (RocksDBException e) { @@ -344,7 +341,7 @@ public class RocksDBSstSessions extends RocksDBSessions { /** * Merge a record to an existing key to a table * For more details about merge-operator: - * https://github.com/facebook/rocksdb/wiki/merge-operator + * <a href="https://github.com/facebook/rocksdb/wiki/merge-operator">...</a> */ @Override public void merge(String table, byte[] key, byte[] value) { @@ -431,10 +428,8 @@ public class RocksDBSstSessions extends RocksDBSessions { * Scan records by key range from a table */ @Override - public BackendColumnIterator scan(String table, - byte[] keyFrom, - byte[] keyTo, - int scanType) { + public BackendColumnIterator scan(String table, byte[] keyFrom, + byte[] keyTo, int scanType) { assert !this.hasChanges(); return BackendColumnIterator.empty(); } diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstStore.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstStore.java index c88cd4970..4ec40ee03 100644 --- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstStore.java +++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstStore.java @@ -19,8 +19,6 @@ package org.apache.hugegraph.backend.store.rocksdbsst; import java.util.List; -import org.rocksdb.RocksDBException; - import org.apache.hugegraph.backend.id.Id; import org.apache.hugegraph.backend.store.BackendStoreProvider; import org.apache.hugegraph.backend.store.rocksdb.RocksDBSessions; @@ -28,6 +26,7 @@ import org.apache.hugegraph.backend.store.rocksdb.RocksDBStore; import org.apache.hugegraph.backend.store.rocksdb.RocksDBTables; import org.apache.hugegraph.config.HugeConfig; import org.apache.hugegraph.type.HugeType; +import org.rocksdb.RocksDBException; public abstract class RocksDBSstStore extends RocksDBStore { @@ -42,8 +41,7 @@ public abstract class RocksDBSstStore extends RocksDBStore { List<String> tableNames) throws RocksDBException { if (tableNames == null) { - return new RocksDBSstSessions(config, this.database(), - this.store(), dataPath); + return new RocksDBSstSessions(config, this.database(), this.store(), dataPath); } else { return new RocksDBSstSessions(config, this.database(), this.store(), dataPath, tableNames); @@ -95,20 +93,17 @@ public abstract class RocksDBSstStore extends RocksDBStore { @Override public Id nextId(HugeType type) { - throw new UnsupportedOperationException( - "RocksDBSstGraphStore.nextId()"); + throw new UnsupportedOperationException("RocksDBSstGraphStore.nextId()"); } @Override public void increaseCounter(HugeType type, long increment) { - throw new UnsupportedOperationException( - "RocksDBSstGraphStore.increaseCounter()"); + throw new UnsupportedOperationException("RocksDBSstGraphStore.increaseCounter()"); } @Override public long getCounter(HugeType type) { - throw new UnsupportedOperationException( - "RocksDBSstGraphStore.getCounter()"); + throw new UnsupportedOperationException("RocksDBSstGraphStore.getCounter()"); } } }
