This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 16fa6a16154 [refact](bdbje) Refact `BDBEnvironment` and `BDBJEJournal`
(#27778)
16fa6a16154 is described below
commit 16fa6a1615406890356e1adae69eed6e0909e492
Author: Lei Zhang <[email protected]>
AuthorDate: Sun Dec 3 23:10:07 2023 +0800
[refact](bdbje) Refact `BDBEnvironment` and `BDBJEJournal` (#27778)
* Add more ut about "org.apache.doris.journal.bdbje"
* Make tiny refactor about "org.apache.doris.journal.bdbje"
---
.../src/main/java/org/apache/doris/DorisFE.java | 2 +-
.../apache/doris/ha/BDBStateChangeListener.java | 6 +-
.../apache/doris/journal/bdbje/BDBDebugger.java | 41 ++-
.../apache/doris/journal/bdbje/BDBEnvironment.java | 47 ++-
.../apache/doris/journal/bdbje/BDBJEJournal.java | 23 +-
...{BDBJEJournalTest.java => BDBDebuggerTest.java} | 91 ++---
.../doris/journal/bdbje/BDBEnvironmentTest.java | 396 +++++++++++++++++++--
.../doris/journal/bdbje/BDBJEJournalTest.java | 16 +-
.../doris/journal/bdbje/BDBJournalCursorTest.java | 118 ++++++
.../{bdb => journal/bdbje}/BDBToolOptionsTest.java | 4 +-
.../doris/{bdb => journal/bdbje}/BDBToolTest.java | 4 +-
.../apache/doris/journal/bdbje/TimestampTest.java | 90 +++++
12 files changed, 676 insertions(+), 162 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
index 224d82c4016..dfadde6626c 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
@@ -152,7 +152,7 @@ public class DorisFE {
if (Config.enable_bdbje_debug_mode) {
// Start in BDB Debug mode
- BDBDebugger.get().startDebugMode(dorisHomeDir);
+ BDBDebugger.get().startDebugMode(Config.meta_dir + "/bdb");
return;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/ha/BDBStateChangeListener.java
b/fe/fe-core/src/main/java/org/apache/doris/ha/BDBStateChangeListener.java
index d643646fc60..2ff66a5aad0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/ha/BDBStateChangeListener.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/ha/BDBStateChangeListener.java
@@ -28,8 +28,10 @@ import org.apache.logging.log4j.Logger;
public class BDBStateChangeListener implements StateChangeListener {
public static final Logger LOG =
LogManager.getLogger(BDBStateChangeListener.class);
+ private final boolean isElectable;
- public BDBStateChangeListener() {
+ public BDBStateChangeListener(boolean isElectable) {
+ this.isElectable = isElectable;
}
@Override
@@ -41,7 +43,7 @@ public class BDBStateChangeListener implements
StateChangeListener {
break;
}
case REPLICA: {
- if (Env.getCurrentEnv().isElectable()) {
+ if (isElectable) {
newType = FrontendNodeType.FOLLOWER;
} else {
newType = FrontendNodeType.OBSERVER;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBDebugger.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBDebugger.java
index ae48526515a..97009edff15 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBDebugger.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBDebugger.java
@@ -58,6 +58,10 @@ public class BDBDebugger {
private static final Logger LOG = LogManager.getLogger(BDBDebugger.class);
private BDBDebugEnv debugEnv;
+ private static class SingletonHolder {
+ private static final BDBDebugger INSTANCE = new BDBDebugger();
+ }
+
public static BDBDebugger get() {
return SingletonHolder.INSTANCE;
}
@@ -65,10 +69,10 @@ public class BDBDebugger {
/**
* Start in BDB Debug mode.
*/
- public void startDebugMode(String dorisHomeDir) {
+ public void startDebugMode(String bdbHome) {
try {
- initDebugEnv();
- startService(dorisHomeDir);
+ initDebugEnv(bdbHome);
+ startService();
while (true) {
Thread.sleep(2000);
}
@@ -78,8 +82,13 @@ public class BDBDebugger {
}
}
+ private void initDebugEnv(String bdbHome) throws BDBDebugException {
+ debugEnv = new BDBDebugEnv(bdbHome);
+ debugEnv.init();
+ }
+
// Only start MySQL and HttpServer
- private void startService(String dorisHomeDir) throws Exception {
+ private void startService() throws Exception {
// HTTP server
HttpServer httpServer = new HttpServer();
@@ -94,19 +103,10 @@ public class BDBDebugger {
ThreadPoolManager.registerAllThreadPoolMetric();
}
- private void initDebugEnv() throws BDBDebugException {
- debugEnv = new BDBDebugEnv(Config.meta_dir + "/bdb/");
- debugEnv.init();
- }
-
public BDBDebugEnv getEnv() {
return debugEnv;
}
- private static class SingletonHolder {
- private static final BDBDebugger INSTANCE = new BDBDebugger();
- }
-
/**
* A wrapper class of the BDBJE environment, used to obtain information in
bdbje.
*/
@@ -144,7 +144,9 @@ public class BDBDebugger {
dbConfig.setAllowCreate(false);
dbConfig.setReadOnly(true);
Database db = env.openDatabase(null, dbName, dbConfig);
- return db.count();
+ long journalNumber = db.count();
+ db.close();
+ return journalNumber;
}
/**
@@ -172,6 +174,8 @@ public class BDBDebugger {
Long id = idBinding.entryToObject(key);
journalIds.add(id);
}
+ cursor.close();
+ db.close();
} catch (Exception e) {
LOG.warn("failed to get journal ids of {}", dbName, e);
throw new BDBDebugException("failed to get journal ids of
database " + dbName, e);
@@ -205,6 +209,7 @@ public class BDBDebugger {
// get the journal
OperationStatus status = db.get(null, key, value,
LockMode.READ_COMMITTED);
+ db.close();
if (status == OperationStatus.SUCCESS) {
byte[] retData = value.getData();
DataInputStream in = new DataInputStream(new
ByteArrayInputStream(retData));
@@ -223,6 +228,14 @@ public class BDBDebugger {
MetaContext.remove();
return entityWrapper;
}
+
+ public void close() {
+ try {
+ env.close();
+ } catch (Exception e) {
+ LOG.warn("exception:", e);
+ }
+ }
}
public static class JournalEntityWrapper {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java
index f2d1c1825ca..7674cf6f597 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java
@@ -19,7 +19,6 @@ package org.apache.doris.journal.bdbje;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
-import org.apache.doris.common.FeConstants;
import org.apache.doris.ha.BDBHA;
import org.apache.doris.ha.BDBStateChangeListener;
import org.apache.doris.ha.FrontendNodeType;
@@ -72,10 +71,8 @@ public class BDBEnvironment {
private static final int MEMORY_CACHE_PERCENT = 20;
private static final List<String> BDBJE_LOG_LEVEL =
ImmutableList.of("OFF", "SEVERE", "WARNING",
"INFO", "CONFIG", "FINE", "FINER", "FINEST", "ALL");
-
public static final String PALO_JOURNAL_GROUP = "PALO_JOURNAL_GROUP";
-
private ReplicatedEnvironment replicatedEnvironment;
private EnvironmentConfig environmentConfig;
private ReplicationConfig replicationConfig;
@@ -84,15 +81,19 @@ public class BDBEnvironment {
private ReentrantReadWriteLock lock;
private List<Database> openedDatabases;
- public BDBEnvironment() {
+ private final boolean isElectable;
+ private final boolean metadataFailureRecovery;
+
+ public BDBEnvironment(boolean isElectable, boolean
metadataFailureRecovery) {
openedDatabases = new ArrayList<Database>();
this.lock = new ReentrantReadWriteLock(true);
+ this.isElectable = isElectable;
+ this.metadataFailureRecovery = metadataFailureRecovery;
}
// The setup() method opens the environment and database
public void setup(File envHome, String selfNodeName, String
selfNodeHostPort,
- String helperHostPort, boolean isElectable) {
- boolean metadataFailureRecovery = null !=
System.getProperty(FeConstants.METADATA_FAILURE_RECOVERY_KEY);
+ String helperHostPort) {
// Almost never used, just in case the master can not restart
if (metadataFailureRecovery) {
if (!isElectable) {
@@ -112,7 +113,7 @@ public class BDBEnvironment {
replicationConfig.setNodeHostPort(selfNodeHostPort);
replicationConfig.setHelperHosts(helperHostPort);
replicationConfig.setGroupName(PALO_JOURNAL_GROUP);
-
replicationConfig.setConfigParam(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT,
"10");
+
replicationConfig.setConfigParam(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT,
"10 s");
replicationConfig.setMaxClockDelta(Config.max_bdbje_clock_delta_ms,
TimeUnit.MILLISECONDS);
replicationConfig.setConfigParam(ReplicationConfig.TXN_ROLLBACK_LIMIT,
String.valueOf(Config.txn_rollback_limit));
@@ -170,6 +171,9 @@ public class BDBEnvironment {
// open environment and epochDB
for (int i = 0; i < RETRY_TIME; i++) {
try {
+ if (replicatedEnvironment != null) {
+ this.close();
+ }
// open the environment
replicatedEnvironment = new ReplicatedEnvironment(envHome,
replicationConfig, environmentConfig);
@@ -178,12 +182,13 @@ public class BDBEnvironment {
Env.getCurrentEnv().setHaProtocol(protocol);
// start state change listener
- StateChangeListener listener = new BDBStateChangeListener();
+ StateChangeListener listener = new
BDBStateChangeListener(isElectable);
replicatedEnvironment.setStateChangeListener(listener);
// open epochDB. the first parameter null means auto-commit
epochDB = replicatedEnvironment.openDatabase(null, "epochDB",
dbConfig);
break;
} catch (InsufficientLogException insufficientLogEx) {
+ LOG.info("i:{} insufficientLogEx:", i, insufficientLogEx);
NetworkRestore restore = new NetworkRestore();
NetworkRestoreConfig config = new NetworkRestoreConfig();
config.setRetainLogFiles(false); // delete obsolete log files.
@@ -193,6 +198,7 @@ public class BDBEnvironment {
// default selection of providers is not suitable.
restore.execute(insufficientLogEx, config);
} catch (DatabaseException e) {
+ LOG.info("i:{} exception:", i, e);
if (i < RETRY_TIME - 1) {
try {
Thread.sleep(5 * 1000);
@@ -381,6 +387,7 @@ public class BDBEnvironment {
if (epochDB != null) {
try {
epochDB.close();
+ epochDB = null;
} catch (DatabaseException exception) {
LOG.error("Error closing db {} will exit",
epochDB.getDatabaseName(), exception);
}
@@ -390,19 +397,7 @@ public class BDBEnvironment {
try {
// Finally, close the store and environment.
replicatedEnvironment.close();
- } catch (DatabaseException exception) {
- LOG.error("Error closing replicatedEnvironment", exception);
- }
- }
- }
-
- // Close environment
- public void closeReplicatedEnvironment() {
- if (replicatedEnvironment != null) {
- try {
- openedDatabases.clear();
- // Finally, close the store and environment.
- replicatedEnvironment.close();
+ replicatedEnvironment = null;
} catch (DatabaseException exception) {
LOG.error("Error closing replicatedEnvironment", exception);
}
@@ -413,18 +408,22 @@ public class BDBEnvironment {
public void openReplicatedEnvironment(File envHome) {
for (int i = 0; i < RETRY_TIME; i++) {
try {
+ if (replicatedEnvironment != null) {
+ this.close();
+ }
// open the environment
replicatedEnvironment =
new ReplicatedEnvironment(envHome, replicationConfig,
environmentConfig);
// start state change listener
- StateChangeListener listener = new BDBStateChangeListener();
+ StateChangeListener listener = new
BDBStateChangeListener(isElectable);
replicatedEnvironment.setStateChangeListener(listener);
// open epochDB. the first parameter null means auto-commit
epochDB = replicatedEnvironment.openDatabase(null, "epochDB",
dbConfig);
break;
} catch (DatabaseException e) {
+ LOG.info("i:{} exception:", i, e);
if (i < RETRY_TIME - 1) {
try {
Thread.sleep(5 * 1000);
@@ -439,7 +438,7 @@ public class BDBEnvironment {
}
}
- private SyncPolicy getSyncPolicy(String policy) {
+ private static SyncPolicy getSyncPolicy(String policy) {
if (policy.equalsIgnoreCase("SYNC")) {
return Durability.SyncPolicy.SYNC;
}
@@ -450,7 +449,7 @@ public class BDBEnvironment {
return Durability.SyncPolicy.WRITE_NO_SYNC;
}
- private ReplicaAckPolicy getAckPolicy(String policy) {
+ private static ReplicaAckPolicy getAckPolicy(String policy) {
if (policy.equalsIgnoreCase("ALL")) {
return Durability.ReplicaAckPolicy.ALL;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
index 395f5b0467c..01db7f53aee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
@@ -18,6 +18,7 @@
package org.apache.doris.journal.bdbje;
import org.apache.doris.catalog.Env;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.io.DataOutputBuffer;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.NetUtils;
@@ -75,14 +76,6 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE
IGNORE THIS LINE: B
private AtomicLong nextJournalId = new AtomicLong(1);
public BDBJEJournal(String nodeName) {
- initBDBEnv(nodeName);
- }
-
- /*
- * Initialize bdb environment.
- * node name is ip_port (the port is edit_log_port)
- */
- private void initBDBEnv(String nodeName) {
environmentPath = Env.getServingEnv().getBdbDir();
HostInfo selfNode = Env.getServingEnv().getSelfNode();
selfNodeName = nodeName;
@@ -327,13 +320,14 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
public synchronized void open() {
if (bdbEnvironment == null) {
File dbEnv = new File(environmentPath);
- bdbEnvironment = new BDBEnvironment();
+
+ boolean metadataFailureRecovery = null !=
System.getProperty(FeConstants.METADATA_FAILURE_RECOVERY_KEY);
+ bdbEnvironment = new
BDBEnvironment(Env.getServingEnv().isElectable(), metadataFailureRecovery);
HostInfo helperNode = Env.getServingEnv().getHelperNode();
String helperHostPort =
NetUtils.getHostPortInAccessibleFormat(helperNode.getHost(),
helperNode.getPort());
try {
- bdbEnvironment.setup(dbEnv, selfNodeName, selfNodeHostPort,
helperHostPort,
- Env.getServingEnv().isElectable());
+ bdbEnvironment.setup(dbEnv, selfNodeName, selfNodeHostPort,
helperHostPort);
} catch (Exception e) {
if (e instanceof DatabaseNotFoundException) {
LOG.error("It is not allowed to set
metadata_failure_recovery"
@@ -380,7 +374,7 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
reSetupBdbEnvironment(insufficientLogEx);
} catch (RollbackException rollbackEx) {
LOG.warn("catch rollback log exception. will reopen the
ReplicatedEnvironment.", rollbackEx);
- bdbEnvironment.closeReplicatedEnvironment();
+ bdbEnvironment.close();
bdbEnvironment.openReplicatedEnvironment(new
File(environmentPath));
}
}
@@ -414,8 +408,7 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
bdbEnvironment.close();
bdbEnvironment.setup(new File(environmentPath), selfNodeName,
selfNodeHostPort,
- NetUtils.getHostPortInAccessibleFormat(helperNode.getHost(),
helperNode.getPort()),
- Env.getServingEnv().isElectable());
+ NetUtils.getHostPortInAccessibleFormat(helperNode.getHost(),
helperNode.getPort()));
}
@Override
@@ -506,7 +499,7 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
} catch (RollbackException rollbackEx) {
if (!Env.isCheckpointThread()) {
LOG.warn("catch rollback log exception. will reopen the
ReplicatedEnvironment.", rollbackEx);
- bdbEnvironment.closeReplicatedEnvironment();
+ bdbEnvironment.close();
bdbEnvironment.openReplicatedEnvironment(new
File(environmentPath));
} else {
throw rollbackEx;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBDebuggerTest.java
similarity index 64%
copy from
fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java
copy to
fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBDebuggerTest.java
index 93522a5308e..27c7efc4301 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBDebuggerTest.java
@@ -18,10 +18,9 @@
package org.apache.doris.journal.bdbje;
import org.apache.doris.catalog.Env;
-import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
-import org.apache.doris.journal.JournalCursor;
+import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.journal.JournalEntity;
import org.apache.doris.persist.OperationType;
import org.apache.doris.system.SystemInfoService.HostInfo;
@@ -36,8 +35,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-// import org.junit.jupiter.api.RepeatedTest; only for debug
+import org.junit.jupiter.api.RepeatedTest;
import java.io.DataOutput;
import java.io.File;
@@ -51,8 +49,8 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
-public class BDBJEJournalTest { // CHECKSTYLE IGNORE THIS LINE: BDBJE should
use uppercase
- private static final Logger LOG =
LogManager.getLogger(BDBJEJournalTest.class);
+public class BDBDebuggerTest {
+ private static final Logger LOG =
LogManager.getLogger(BDBDebuggerTest.class);
private static List<File> tmpDirs = new ArrayList<>();
public static File createTmpDir() throws Exception {
@@ -74,7 +72,7 @@ public class BDBJEJournalTest { // CHECKSTYLE IGNORE THIS
LINE: BDBJE should use
@AfterAll
public static void cleanUp() throws Exception {
for (File dir : tmpDirs) {
- LOG.info("deleteTmpDir path {}", dir.getAbsolutePath());
+ LOG.debug("deleteTmpDir path {}", dir.getAbsolutePath());
FileUtils.deleteDirectory(dir);
}
}
@@ -98,8 +96,7 @@ public class BDBJEJournalTest { // CHECKSTYLE IGNORE THIS
LINE: BDBJE should use
return port;
}
- // @RepeatedTest(100) only for debug
- @Test
+ @RepeatedTest(1)
public void testNormal() throws Exception {
int port = findValidPort();
Preconditions.checkArgument(((port > 0) && (port < 65535)));
@@ -161,71 +158,23 @@ public class BDBJEJournalTest { // CHECKSTYLE IGNORE THIS
LINE: BDBJE should use
};
journal.write(OperationType.OP_TIMESTAMP, writable);
}
-
- Assertions.assertEquals(10, journal.getMaxJournalId());
- Assertions.assertEquals(10, journal.getJournalNum());
- Assertions.assertEquals(1, journal.getMinJournalId());
- Assertions.assertEquals(0, journal.getFinalizedJournalId());
-
- LOG.debug("journal.getDatabaseNames(): {}",
journal.getDatabaseNames());
- Assertions.assertEquals(1, journal.getDatabaseNames().size());
- Assertions.assertEquals(1, journal.getDatabaseNames().get(0));
-
JournalEntity journalEntity = journal.read(1);
Assertions.assertEquals(OperationType.OP_TIMESTAMP,
journalEntity.getOpCode());
-
- for (int i = 10; i < 50; i++) {
- if (i % 10 == 0) {
- journal.rollJournal();
- }
- String data = "OperationType.OP_TIMESTAMP";
- Writable writable = new Writable() {
- @Override
- public void write(DataOutput out) throws IOException {
- Text.writeString(out, data);
- }
- };
- journal.write(OperationType.OP_TIMESTAMP, writable);
- }
-
- Assertions.assertEquals(50, journal.getMaxJournalId());
- Assertions.assertEquals(10, journal.getJournalNum());
- Assertions.assertEquals(1, journal.getMinJournalId());
- Assertions.assertEquals(40, journal.getFinalizedJournalId());
-
- LOG.debug("journal.getDatabaseNames(): {}",
journal.getDatabaseNames());
- Assertions.assertEquals(5, journal.getDatabaseNames().size());
- Assertions.assertEquals(41, journal.getDatabaseNames().get(4));
-
- JournalCursor cursor = journal.read(1, 50);
- Assertions.assertNotNull(cursor);
- for (int i = 1; i < 50; i++) {
- Pair<Long, JournalEntity> kv = cursor.next();
- Assertions.assertNotNull(kv);
- JournalEntity entity = kv.second;
- Assertions.assertEquals(OperationType.OP_TIMESTAMP,
entity.getOpCode());
- }
-
journal.close();
- Assertions.assertEquals(null, journal.getBDBEnvironment());
- journal.open();
- Assertions.assertTrue(journal.getBDBEnvironment() != null);
- // BDBEnvrinment need several seconds election from unknown to master
- for (int i = 0; i < 10; i++) {
- if
(journal.getBDBEnvironment().getReplicatedEnvironment().getState()
- .equals(ReplicatedEnvironment.State.MASTER)) {
- break;
- }
- Thread.sleep(1000);
- }
-
- Assertions.assertEquals(ReplicatedEnvironment.State.MASTER,
-
journal.getBDBEnvironment().getReplicatedEnvironment().getState());
- journal.deleteJournals(21);
- LOG.info("journal.getDatabaseNames(): {}", journal.getDatabaseNames());
- Assertions.assertEquals(3, journal.getDatabaseNames().size());
- Assertions.assertEquals(21, journal.getDatabaseNames().get(0));
- journal.close();
+ Deencapsulation.invoke(BDBDebugger.get(), "initDebugEnv",
tmpDir.getAbsolutePath());
+ // BDBDebugger.BDBDebugEnv bdbDebugEnv = new
BDBDebugger.BDBDebugEnv(tmpDir.getAbsolutePath());
+ // bdbDebugEnv.init();
+ BDBDebugger.BDBDebugEnv bdbDebugEnv = BDBDebugger.get().getEnv();
+
+ LOG.info("{}|{}|{}", bdbDebugEnv.listDbNames(),
bdbDebugEnv.getJournalIds("1"),
+ bdbDebugEnv.getJournalNumber("1"));
+ Assertions.assertEquals(2, bdbDebugEnv.listDbNames().size());
+ Assertions.assertEquals(10, bdbDebugEnv.getJournalIds("1").size());
+ Assertions.assertEquals(10, bdbDebugEnv.getJournalNumber("1"));
+ BDBDebugger.JournalEntityWrapper entityWrapper =
bdbDebugEnv.getJournalEntity("1", 5L);
+ Assertions.assertEquals(5, entityWrapper.journalId);
+ Assertions.assertEquals(OperationType.OP_TIMESTAMP,
entityWrapper.entity.getOpCode());
+ bdbDebugEnv.close();
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java
index 98f1fc57f99..09b10b604b8 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java
@@ -18,19 +18,31 @@
package org.apache.doris.journal.bdbje;
import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.ha.FrontendNodeType;
+import org.apache.doris.system.Frontend;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Durability;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
+import mockit.Mock;
+import mockit.MockUp;
import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.RepeatedTest;
+// import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
@@ -38,29 +50,44 @@ import java.net.DatagramSocket;
import java.net.ServerSocket;
import java.net.SocketException;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
public class BDBEnvironmentTest {
private static final Logger LOG =
LogManager.getLogger(BDBEnvironmentTest.class);
- private static List<File> tmpDirs = new ArrayList<>();
+ private static List<String> tmpDirs = new ArrayList<>();
- public static File createTmpDir() throws Exception {
+ public static String createTmpDir() throws Exception {
String dorisHome = System.getenv("DORIS_HOME");
+ if (Strings.isNullOrEmpty(dorisHome)) {
+ dorisHome =
Files.createTempDirectory("DORIS_HOME").toAbsolutePath().toString();
+ }
Preconditions.checkArgument(!Strings.isNullOrEmpty(dorisHome));
- File dir = Files.createTempDirectory(Paths.get(dorisHome, "fe",
"mocked"), "BDBEnvironmentTest").toFile();
+ Path mockDir = Paths.get(dorisHome, "fe", "mocked");
+ if (!Files.exists(mockDir)) {
+ Files.createDirectories(mockDir);
+ }
+ UUID uuid = UUID.randomUUID();
+ File dir = Files.createDirectories(Paths.get(dorisHome, "fe",
"mocked", "BDBEnvironmentTest-" + uuid.toString())).toFile();
LOG.debug("createTmpDir path {}", dir.getAbsolutePath());
- tmpDirs.add(dir);
- return dir;
+ tmpDirs.add(dir.getAbsolutePath());
+ return dir.getAbsolutePath();
+ }
+
+ @BeforeAll
+ public static void startUp() throws Exception {
+ Config.bdbje_file_logging_level = "ALL";
}
@AfterAll
public static void cleanUp() throws Exception {
- for (File dir : tmpDirs) {
- LOG.info("deleteTmpDir path {}", dir.getAbsolutePath());
- FileUtils.deleteDirectory(dir);
+ for (String dir : tmpDirs) {
+ LOG.debug("deleteTmpDir path {}", dir);
+ FileUtils.deleteDirectory(new File(dir));
}
}
@@ -90,15 +117,16 @@ public class BDBEnvironmentTest {
return byteArray;
}
- @Test
+ // @Test
+ @RepeatedTest(1)
public void testSetup() throws Exception {
int port = findValidPort();
String selfNodeName = Env.genFeNodeName("127.0.0.1", port, false);
String selfNodeHostPort = "127.0.0.1:" + port;
LOG.debug("selfNodeName:{}, selfNodeHostPort:{}", selfNodeName,
selfNodeHostPort);
- BDBEnvironment bdbEnvironment = new BDBEnvironment();
- bdbEnvironment.setup(createTmpDir(), selfNodeName, selfNodeHostPort,
selfNodeHostPort, true);
+ BDBEnvironment bdbEnvironment = new BDBEnvironment(true, false);
+ bdbEnvironment.setup(new File(createTmpDir()), selfNodeName,
selfNodeHostPort, selfNodeHostPort);
String dbName = "testEnvironment";
Database db = bdbEnvironment.openDatabase(dbName);
@@ -113,6 +141,8 @@ public class BDBEnvironmentTest {
// Remove database
bdbEnvironment.removeDatabase(dbName);
+ // Rmove database twice will get DatabaseNotFoundException
+ bdbEnvironment.removeDatabase(dbName);
Exception exception =
Assertions.assertThrows(IllegalStateException.class, () -> {
db.put(null, key, value);
});
@@ -122,6 +152,34 @@ public class BDBEnvironmentTest {
LOG.debug("exception:", exception);
Assertions.assertTrue(actualMessage.contains(expectedMessage));
+ Database epochDb = bdbEnvironment.getEpochDB();
+ Assertions.assertEquals(OperationStatus.SUCCESS, epochDb.put(null,
key, value));
+ DatabaseEntry readValue2 = new DatabaseEntry();
+ Assertions.assertEquals(OperationStatus.SUCCESS, epochDb.get(null,
key, readValue2, LockMode.READ_COMMITTED));
+ Assertions.assertEquals(new String(value.getData()), new
String(readValue2.getData()));
+
+ new MockUp<Env>() {
+ int i = 0;
+ @Mock
+ public List<Frontend> getFrontends(FrontendNodeType nodeType) {
+ ArrayList<Frontend> frontends = new ArrayList<Frontend>();
+ if (i == 0) {
+ i++;
+ return frontends;
+ }
+ Frontend frontend = new Frontend(FrontendNodeType.FOLLOWER,
selfNodeName,
+ "127.0.0.1", port);
+ frontend.setIsAlive(true);
+ frontends.add(frontend);
+ return frontends;
+ }
+ };
+
+ ReplicationGroupAdmin replicationGroupAdmin =
bdbEnvironment.getReplicationGroupAdmin();
+ Assertions.assertNull(replicationGroupAdmin);
+ replicationGroupAdmin = bdbEnvironment.getReplicationGroupAdmin();
+ Assertions.assertNotNull(replicationGroupAdmin);
+
bdbEnvironment.close();
exception = Assertions.assertThrows(IllegalStateException.class, () ->
{
db.put(null, key, value);
@@ -132,20 +190,101 @@ public class BDBEnvironmentTest {
Assertions.assertTrue(actualMessage.contains(expectedMessage));
}
+ // @Test
+ @RepeatedTest(1)
+ public void testSetupTwice() throws Exception {
+ int port = findValidPort();
+ String selfNodeName = Env.genFeNodeName("127.0.0.1", port, false);
+ String selfNodeHostPort = "127.0.0.1:" + port;
+ File homeFile = new File(createTmpDir());
+ BDBEnvironment bdbEnvironment = new BDBEnvironment(true, false);
+ bdbEnvironment.setup(homeFile, selfNodeName, selfNodeHostPort,
selfNodeHostPort);
+
+ bdbEnvironment.setup(homeFile, selfNodeName, selfNodeHostPort,
selfNodeHostPort);
+ bdbEnvironment.close();
+ }
+
+ // @Test
+ @RepeatedTest(1)
+ public void testMetadataRecovery() throws Exception {
+ int port = findValidPort();
+ String selfNodeName = Env.genFeNodeName("127.0.0.1", port, false);
+ String selfNodeHostPort = "127.0.0.1:" + port;
+
+ File recoveryFile = new File(createTmpDir());
+ BDBEnvironment bdbEnvironment = new BDBEnvironment(true, false);
+ bdbEnvironment.setup(recoveryFile, selfNodeName, selfNodeHostPort,
selfNodeHostPort);
+
+ String dbName = "testMetadataRecovery";
+ Database db = bdbEnvironment.openDatabase(dbName);
+ DatabaseEntry key = new DatabaseEntry(randomBytes());
+ DatabaseEntry value = new DatabaseEntry(randomBytes());
+
+ Assertions.assertEquals(OperationStatus.SUCCESS, db.put(null, key,
value));
+
+ DatabaseEntry readValue = new DatabaseEntry();
+ Assertions.assertEquals(OperationStatus.SUCCESS, db.get(null, key,
readValue, LockMode.READ_COMMITTED));
+ Assertions.assertEquals(new String(value.getData()), new
String(readValue.getData()));
+ bdbEnvironment.close();
+
+ // recovery mode
+ BDBEnvironment bdbEnvironment2 = new BDBEnvironment(true, true);
+ bdbEnvironment2.setup(recoveryFile, selfNodeName, selfNodeHostPort,
selfNodeHostPort);
+ Database db2 = bdbEnvironment2.openDatabase(dbName);
+
+ DatabaseEntry readValue2 = new DatabaseEntry();
+ Assertions.assertEquals(OperationStatus.SUCCESS, db2.get(null, key,
readValue2, LockMode.READ_COMMITTED));
+ Assertions.assertEquals(new String(value.getData()), new
String(readValue2.getData()));
+ bdbEnvironment2.close();
+ }
+
+ // @Test
+ @RepeatedTest(1)
+ public void testOpenReplicatedEnvironmentTwice() throws Exception {
+ int port = findValidPort();
+ String selfNodeName = Env.genFeNodeName("127.0.0.1", port, false);
+ String selfNodeHostPort = "127.0.0.1:" + port;
+
+ File homeFile = new File(createTmpDir());
+ BDBEnvironment bdbEnvironment = new BDBEnvironment(true, false);
+ bdbEnvironment.setup(homeFile, selfNodeName, selfNodeHostPort,
selfNodeHostPort);
+
+ String dbName = "testMetadataRecovery";
+ Database db = bdbEnvironment.openDatabase(dbName);
+ DatabaseEntry key = new DatabaseEntry(randomBytes());
+ DatabaseEntry value = new DatabaseEntry(randomBytes());
+
+ Assertions.assertEquals(OperationStatus.SUCCESS, db.put(null, key,
value));
+
+ DatabaseEntry readValue = new DatabaseEntry();
+ Assertions.assertEquals(OperationStatus.SUCCESS, db.get(null, key,
readValue, LockMode.READ_COMMITTED));
+ Assertions.assertEquals(new String(value.getData()), new
String(readValue.getData()));
+ bdbEnvironment.close();
+
+ bdbEnvironment.openReplicatedEnvironment(homeFile);
+ bdbEnvironment.openReplicatedEnvironment(homeFile);
+ Database db2 = bdbEnvironment.openDatabase(dbName);
+ DatabaseEntry readValue2 = new DatabaseEntry();
+ Assertions.assertEquals(OperationStatus.SUCCESS, db2.get(null, key,
readValue2, LockMode.READ_COMMITTED));
+ Assertions.assertEquals(new String(value.getData()), new
String(readValue2.getData()));
+ bdbEnvironment.close();
+ }
+
/**
* Test build a BDBEnvironment cluster (1 master + 2 follower + 1 observer)
* @throws Exception
*/
- @Test
+ // @Test
+ @RepeatedTest(1)
public void testCluster() throws Exception {
int masterPort = findValidPort();
String masterNodeName = Env.genFeNodeName("127.0.0.1", masterPort,
false);
String masterNodeHostPort = "127.0.0.1:" + masterPort;
LOG.debug("masterNodeName:{}, masterNodeHostPort:{}", masterNodeName,
masterNodeHostPort);
- BDBEnvironment masterEnvironment = new BDBEnvironment();
- File masterDir = createTmpDir();
- masterEnvironment.setup(masterDir, masterNodeName, masterNodeHostPort,
masterNodeHostPort, true);
+ BDBEnvironment masterEnvironment = new BDBEnvironment(true, false);
+ File masterDir = new File(createTmpDir());
+ masterEnvironment.setup(masterDir, masterNodeName, masterNodeHostPort,
masterNodeHostPort);
List<BDBEnvironment> followerEnvironments = new ArrayList<>();
List<File> followerDirs = new ArrayList<>();
@@ -155,10 +294,10 @@ public class BDBEnvironmentTest {
String followerNodeHostPort = "127.0.0.1:" + followerPort;
LOG.debug("followerNodeName{}:{}, followerNodeHostPort{}:{}", i,
i, followerNodeName, followerNodeHostPort);
- BDBEnvironment followerEnvironment = new BDBEnvironment();
- File followerDir = createTmpDir();
+ BDBEnvironment followerEnvironment = new BDBEnvironment(true,
false);
+ File followerDir = new File(createTmpDir());
followerDirs.add(followerDir);
- followerEnvironment.setup(followerDir, followerNodeName,
followerNodeHostPort, masterNodeHostPort, true);
+ followerEnvironment.setup(followerDir, followerNodeName,
followerNodeHostPort, masterNodeHostPort);
followerEnvironments.add(followerEnvironment);
}
@@ -167,9 +306,9 @@ public class BDBEnvironmentTest {
String observerNodeHostPort = "127.0.0.1:" + observerPort;
LOG.debug("observerNodeName:{}, observerNodeHostPort:{}",
observerNodeName, observerNodeHostPort);
- BDBEnvironment observerEnvironment = new BDBEnvironment();
- File observerDir = createTmpDir();
- observerEnvironment.setup(observerDir, observerNodeName,
observerNodeHostPort, masterNodeHostPort, false);
+ BDBEnvironment observerEnvironment = new BDBEnvironment(false, false);
+ File observerDir = new File(createTmpDir());
+ observerEnvironment.setup(observerDir, observerNodeName,
observerNodeHostPort, masterNodeHostPort);
String dbName = "1234";
Database masterDb = masterEnvironment.openDatabase(dbName);
@@ -207,4 +346,217 @@ public class BDBEnvironmentTest {
environment.close(); });
masterEnvironment.close();
}
+
+ class NodeInfo {
+ public String name;
+ public String hostPort;
+ public String dir;
+
+ NodeInfo(String name, String hostPort, String dir) {
+ this.name = name;
+ this.hostPort = hostPort;
+ this.dir = dir;
+ }
+ }
+
+ private Pair<BDBEnvironment, NodeInfo>
findMaster(List<Pair<BDBEnvironment, NodeInfo>> followersInfo)
+ throws Exception {
+ NodeInfo masterNode = null;
+ BDBEnvironment masterEnvironment = null;
+ boolean electionSuccess = true;
+ for (int i = 0; i < 10; i++) {
+ electionSuccess = true;
+ for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) {
+ if (entryPair.first.getReplicatedEnvironment().getState()
+ .equals(ReplicatedEnvironment.State.MASTER)) {
+ masterEnvironment = entryPair.first;
+ masterNode = entryPair.second;
+ }
+ if (!entryPair.first.getReplicatedEnvironment().getState()
+ .equals(ReplicatedEnvironment.State.MASTER)
+ &&
!entryPair.first.getReplicatedEnvironment().getState()
+ .equals(ReplicatedEnvironment.State.REPLICA)) {
+ electionSuccess = false;
+ }
+ }
+ if (!electionSuccess) {
+ Thread.sleep(1000);
+ }
+ }
+ Assertions.assertTrue(electionSuccess);
+ Assertions.assertNotNull(masterNode);
+ Assertions.assertNotNull(masterEnvironment);
+ return Pair.of(masterEnvironment, masterNode);
+ }
+
+ // @Test
+ @RepeatedTest(1)
+ public void testRollbackException() throws Exception {
+ LOG.info("start");
+ List<Pair<BDBEnvironment, NodeInfo>> followersInfo = new ArrayList<>();
+
+ int masterPort = findValidPort();
+ String masterNodeName = "fe1";
+ String masterNodeHostPort = "127.0.0.1:" + masterPort;
+
+ BDBEnvironment masterEnvironment = new BDBEnvironment(true, false);
+ String masterDir = createTmpDir();
+ masterEnvironment.setup(new File(masterDir), masterNodeName,
masterNodeHostPort, masterNodeHostPort);
+ followersInfo.add(Pair.of(masterEnvironment, new
NodeInfo(masterNodeName, masterNodeHostPort, masterDir)));
+
+ for (int i = 2; i <= 3; i++) {
+ int nodePort = findValidPort();
+ String nodeName = "fe" + i;
+ String nodeHostPort = "127.0.0.1:" + nodePort;
+
+ BDBEnvironment followerEnvironment = new BDBEnvironment(true,
false);
+ String nodeDir = createTmpDir();
+ followerEnvironment.setup(new File(nodeDir), nodeName,
nodeHostPort, masterNodeHostPort);
+ followersInfo.add(Pair.of(followerEnvironment, new
NodeInfo(nodeName, nodeHostPort, nodeDir)));
+ }
+
+ Pair<BDBEnvironment, NodeInfo> masterPair = findMaster(followersInfo);
+ String beginDbName = String.valueOf(0L);
+ Database masterDb = masterPair.first.openDatabase(beginDbName);
+ DatabaseEntry key = new DatabaseEntry(randomBytes());
+ DatabaseEntry value = new DatabaseEntry(randomBytes());
+ Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.put(null,
key, value));
+ Assertions.assertEquals(1,
masterEnvironment.getDatabaseNames().size());
+ LOG.info("master is {} | {}", masterPair.second.name,
masterPair.second.dir);
+
+ for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) {
+ if (entryPair.second.dir.equals(masterPair.second.dir)) {
+ LOG.info("skip {}", entryPair.second.name);
+ return;
+ }
+
+ Assertions.assertEquals(1,
entryPair.first.getDatabaseNames().size());
+ Database followerDb = entryPair.first.openDatabase(beginDbName);
+ DatabaseEntry readValue = new DatabaseEntry();
+ Assertions.assertEquals(OperationStatus.SUCCESS,
followerDb.get(null, key, readValue, LockMode.READ_COMMITTED));
+ Assertions.assertEquals(new String(value.getData()), new
String(readValue.getData()));
+ followerDb.close();
+ }
+
+ masterDb.close();
+ masterEnvironment.getEpochDB().close();
+
+ followersInfo.stream().forEach(entryPair -> {
+ entryPair.first.close();
+ LOG.info("close {} | {}", entryPair.second.name,
entryPair.second.dir);
+ });
+
+ // all follower closed
+ for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) {
+ String followerCopyDir = entryPair.second.dir + "_copy";
+ LOG.info("Copy from {} to {}", entryPair.second.dir,
followerCopyDir);
+ FileUtils.copyDirectory(new File(entryPair.second.dir), new
File(followerCopyDir));
+ }
+
+ followersInfo.stream().forEach(entryPair -> {
+ entryPair.first.openReplicatedEnvironment(new
File(entryPair.second.dir));
+ LOG.info("open {} | {}", entryPair.second.name,
entryPair.second.dir);
+ });
+
+ masterPair = findMaster(followersInfo);
+
+ masterDb = masterPair.first.openDatabase(String.valueOf(1L));
+ for (int i = 0; i < 2 * Config.txn_rollback_limit + 10; i++) {
+ // for (int i = 0; i < 10; i++) {
+ OperationStatus status = masterDb.put(null, new
DatabaseEntry(randomBytes()), new DatabaseEntry(randomBytes()));
+ Assertions.assertEquals(OperationStatus.SUCCESS, status);
+ }
+ Assertions.assertEquals(2, masterPair.first.getDatabaseNames().size());
+ Assertions.assertEquals(0, masterPair.first.getDatabaseNames().get(0));
+ Assertions.assertEquals(1, masterPair.first.getDatabaseNames().get(1));
+
+ followersInfo.stream().forEach(entryPair -> {
+ entryPair.first.close();
+ LOG.info("close {} | {}", entryPair.second.name,
entryPair.second.dir);
+ });
+
+ // Restore follower's (not new master) bdbje dir
+ for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) {
+ if (entryPair.second.dir.equals(masterDir)) {
+ String masterCopyDir = entryPair.second.dir + "_copy";
+ FileUtils.deleteDirectory(new File(masterCopyDir));
+ continue;
+ }
+ LOG.info("Delete followerDir {} ", entryPair.second.dir);
+ FileUtils.deleteDirectory(new File(entryPair.second.dir));
+ // FileUtils.moveDirectory(new File(entryPair.second.dir), new
File(entryPair.second.dir + "_copy2"));
+ String followerCopyDir = entryPair.second.dir + "_copy";
+ LOG.info("Move {} to {}", followerCopyDir, entryPair.second.dir);
+ FileUtils.moveDirectory(new File(followerCopyDir), new
File(entryPair.second.dir));
+ }
+
+ Thread.sleep(1000);
+ for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) {
+ if (entryPair.second.dir.equals(masterPair.second.dir)) {
+ LOG.info("skip open {} | {}", entryPair.second.name,
entryPair.second.dir);
+ continue;
+ }
+ entryPair.first.openReplicatedEnvironment(new
File(entryPair.second.dir));
+ LOG.info("open {} | {}", entryPair.second.name,
entryPair.second.dir);
+ }
+
+ BDBEnvironment newMasterEnvironment = null;
+ boolean found = false;
+ for (int i = 0; i < 300; i++) {
+ for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) {
+ if (entryPair.second.dir.equals(masterPair.second.dir)) {
+ continue;
+ }
+
+ LOG.info("name:{} state:{} dir:{}",
entryPair.first.getReplicatedEnvironment().getNodeName(),
+ entryPair.first.getReplicatedEnvironment().getState(),
+ entryPair.second.dir);
+ if
(entryPair.first.getReplicatedEnvironment().getState().equals(ReplicatedEnvironment.State.MASTER))
{
+ newMasterEnvironment = entryPair.first;
+ found = true;
+ break;
+ }
+ }
+ if (found) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ Assertions.assertNotNull(newMasterEnvironment);
+
+ masterDb = newMasterEnvironment.openDatabase(beginDbName);
+ Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.put(null,
new DatabaseEntry(randomBytes()), new DatabaseEntry(randomBytes())));
+ Assertions.assertEquals(1,
newMasterEnvironment.getDatabaseNames().size());
+ // // old master
+ masterEnvironment.openReplicatedEnvironment(new File(masterDir));
+ followersInfo.stream().forEach(entryPair -> {
+ entryPair.first.close();
+ LOG.info("close {} | {}", entryPair.second.name,
entryPair.second.dir);
+ });
+ LOG.info("end");
+ }
+
+ @RepeatedTest(1)
+ public void testGetSyncPolicy() throws Exception {
+ Assertions.assertEquals(Durability.SyncPolicy.NO_SYNC,
+ Deencapsulation.invoke(BDBEnvironment.class, "getSyncPolicy",
"NO_SYNC"));
+
+ Assertions.assertEquals(Durability.SyncPolicy.SYNC,
+ Deencapsulation.invoke(BDBEnvironment.class, "getSyncPolicy",
"SYNC"));
+
+ Assertions.assertEquals(Durability.SyncPolicy.WRITE_NO_SYNC,
+ Deencapsulation.invoke(BDBEnvironment.class, "getSyncPolicy",
"default"));
+ }
+
+ @RepeatedTest(1)
+ public void testGetAckPolicy() throws Exception {
+ Assertions.assertEquals(Durability.ReplicaAckPolicy.ALL,
+ Deencapsulation.invoke(BDBEnvironment.class, "getAckPolicy",
"ALL"));
+
+ Assertions.assertEquals(Durability.ReplicaAckPolicy.NONE,
+ Deencapsulation.invoke(BDBEnvironment.class, "getAckPolicy",
"NONE"));
+
+ Assertions.assertEquals(Durability.ReplicaAckPolicy.SIMPLE_MAJORITY,
+ Deencapsulation.invoke(BDBEnvironment.class, "getAckPolicy",
"default"));
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java
index 93522a5308e..ba81d6697ba 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java
@@ -36,8 +36,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-// import org.junit.jupiter.api.RepeatedTest; only for debug
+import org.junit.jupiter.api.RepeatedTest;
import java.io.DataOutput;
import java.io.File;
@@ -74,7 +73,7 @@ public class BDBJEJournalTest { // CHECKSTYLE IGNORE THIS
LINE: BDBJE should use
@AfterAll
public static void cleanUp() throws Exception {
for (File dir : tmpDirs) {
- LOG.info("deleteTmpDir path {}", dir.getAbsolutePath());
+ LOG.debug("deleteTmpDir path {}", dir.getAbsolutePath());
FileUtils.deleteDirectory(dir);
}
}
@@ -98,8 +97,7 @@ public class BDBJEJournalTest { // CHECKSTYLE IGNORE THIS
LINE: BDBJE should use
return port;
}
- // @RepeatedTest(100) only for debug
- @Test
+ @RepeatedTest(1)
public void testNormal() throws Exception {
int port = findValidPort();
Preconditions.checkArgument(((port > 0) && (port < 65535)));
@@ -197,15 +195,17 @@ public class BDBJEJournalTest { // CHECKSTYLE IGNORE THIS
LINE: BDBJE should use
Assertions.assertEquals(5, journal.getDatabaseNames().size());
Assertions.assertEquals(41, journal.getDatabaseNames().get(4));
- JournalCursor cursor = journal.read(1, 50);
+ JournalCursor cursor = journal.read(1, 51);
Assertions.assertNotNull(cursor);
- for (int i = 1; i < 50; i++) {
+ for (int i = 0; i < 50; i++) {
Pair<Long, JournalEntity> kv = cursor.next();
Assertions.assertNotNull(kv);
JournalEntity entity = kv.second;
Assertions.assertEquals(OperationType.OP_TIMESTAMP,
entity.getOpCode());
}
+ Assertions.assertEquals(null, cursor.next());
+
journal.close();
Assertions.assertEquals(null, journal.getBDBEnvironment());
@@ -223,7 +223,7 @@ public class BDBJEJournalTest { // CHECKSTYLE IGNORE THIS
LINE: BDBJE should use
Assertions.assertEquals(ReplicatedEnvironment.State.MASTER,
journal.getBDBEnvironment().getReplicatedEnvironment().getState());
journal.deleteJournals(21);
- LOG.info("journal.getDatabaseNames(): {}", journal.getDatabaseNames());
+ LOG.debug("journal.getDatabaseNames(): {}",
journal.getDatabaseNames());
Assertions.assertEquals(3, journal.getDatabaseNames().size());
Assertions.assertEquals(21, journal.getDatabaseNames().get(0));
journal.close();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJournalCursorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJournalCursorTest.java
new file mode 100644
index 00000000000..d68d616574c
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJournalCursorTest.java
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.journal.bdbje;
+
+import org.apache.doris.catalog.Env;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.sleepycat.je.Database;
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.RepeatedTest;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.ServerSocket;
+import java.net.SocketException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+public class BDBJournalCursorTest {
+ private static final Logger LOG =
LogManager.getLogger(BDBEnvironmentTest.class);
+ private static List<String> tmpDirs = new ArrayList<>();
+
+ public static String createTmpDir() throws Exception {
+ String dorisHome = System.getenv("DORIS_HOME");
+ if (Strings.isNullOrEmpty(dorisHome)) {
+ dorisHome =
Files.createTempDirectory("DORIS_HOME").toAbsolutePath().toString();
+ }
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(dorisHome));
+ Path mockDir = Paths.get(dorisHome, "fe", "mocked");
+ if (!Files.exists(mockDir)) {
+ Files.createDirectories(mockDir);
+ }
+ UUID uuid = UUID.randomUUID();
+ File dir = Files.createDirectories(Paths.get(dorisHome, "fe",
"mocked", "BDBEnvironmentTest-" + uuid.toString())).toFile();
+ LOG.debug("createTmpDir path {}", dir.getAbsolutePath());
+ tmpDirs.add(dir.getAbsolutePath());
+ return dir.getAbsolutePath();
+ }
+
+ @AfterAll
+ public static void cleanUp() throws Exception {
+ for (String dir : tmpDirs) {
+ LOG.debug("deleteTmpDir path {}", dir);
+ FileUtils.deleteDirectory(new File(dir));
+ }
+ }
+
+ private int findValidPort() {
+ int port = 0;
+ for (int i = 0; i < 65535; i++) {
+ try (ServerSocket socket = new ServerSocket(0)) {
+ socket.setReuseAddress(true);
+ port = socket.getLocalPort();
+ try (DatagramSocket datagramSocket = new DatagramSocket(port))
{
+ datagramSocket.setReuseAddress(true);
+ break;
+ } catch (SocketException e) {
+ LOG.info("The port {} is invalid and try another port",
port);
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException("Could not find a free TCP/IP
port");
+ }
+ }
+ Preconditions.checkArgument(((port > 0) && (port < 65536)));
+ return port;
+ }
+
+ @RepeatedTest(1)
+ public void testNormal() throws Exception {
+ Assertions.assertTrue(BDBJournalCursor.getJournalCursor(null, -1, 20)
== null);
+ Assertions.assertTrue(BDBJournalCursor.getJournalCursor(null, 21, 20)
== null);
+
+ int port = findValidPort();
+ String selfNodeName = Env.genFeNodeName("127.0.0.1", port, false);
+ String selfNodeHostPort = "127.0.0.1:" + port;
+ LOG.debug("selfNodeName:{}, selfNodeHostPort:{}", selfNodeName,
selfNodeHostPort);
+
+ BDBEnvironment bdbEnvironment = new BDBEnvironment(true, false);
+ bdbEnvironment.setup(new File(createTmpDir()), selfNodeName,
selfNodeHostPort, selfNodeHostPort);
+
+ Database db = bdbEnvironment.openDatabase("1");
+ db.close();
+
+ BDBJournalCursor bdbJournalCursor =
BDBJournalCursor.getJournalCursor(bdbEnvironment, 1, 10);
+ Assertions.assertTrue(bdbJournalCursor != null);
+ Assertions.assertTrue(bdbJournalCursor.next() == null);
+
+ bdbEnvironment.close();
+
+ bdbJournalCursor = BDBJournalCursor.getJournalCursor(bdbEnvironment,
1, 10);
+ Assertions.assertTrue(bdbJournalCursor == null);
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/bdb/BDBToolOptionsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBToolOptionsTest.java
similarity index 93%
rename from
fe/fe-core/src/test/java/org/apache/doris/bdb/BDBToolOptionsTest.java
rename to
fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBToolOptionsTest.java
index d6d93fa1332..426f5e3e72b 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/bdb/BDBToolOptionsTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBToolOptionsTest.java
@@ -15,10 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.bdb;
+package org.apache.doris.journal.bdbje;
import org.apache.doris.common.FeConstants;
-import org.apache.doris.journal.bdbje.BDBToolOptions;
import org.junit.Assert;
import org.junit.Test;
@@ -36,6 +35,7 @@ public class BDBToolOptionsTest {
Assert.assertTrue(options.hasFromKey());
Assert.assertTrue(options.hasEndKey());
Assert.assertNotSame(FeConstants.meta_version,
options.getMetaVersion());
+ Assert.assertTrue(options.toString().contains("12345"));
}
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/bdb/BDBToolTest.java
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBToolTest.java
similarity index 97%
rename from fe/fe-core/src/test/java/org/apache/doris/bdb/BDBToolTest.java
rename to
fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBToolTest.java
index f9693fcd70e..5940935a316 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/bdb/BDBToolTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBToolTest.java
@@ -15,12 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.bdb;
+package org.apache.doris.journal.bdbje;
import org.apache.doris.common.io.DataOutputBuffer;
import org.apache.doris.journal.JournalEntity;
-import org.apache.doris.journal.bdbje.BDBTool;
-import org.apache.doris.journal.bdbje.BDBToolOptions;
import org.apache.doris.persist.OperationType;
import org.apache.doris.persist.ReplicaPersistInfo;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/TimestampTest.java
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/TimestampTest.java
new file mode 100644
index 00000000000..36c4bcacb5a
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/TimestampTest.java
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.journal.bdbje;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.RepeatedTest;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+public class TimestampTest {
+ private static final Logger LOG =
LogManager.getLogger(TimestampTest.class);
+ private static List<String> testFiles = new ArrayList<>();
+
+ public static String createTestFile() throws Exception {
+ String dorisHome = System.getenv("DORIS_HOME");
+ if (Strings.isNullOrEmpty(dorisHome)) {
+ dorisHome =
Files.createTempDirectory("DORIS_HOME").toAbsolutePath().toString();
+ }
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(dorisHome));
+ Path mockDir = Paths.get(dorisHome, "fe", "mocked");
+ if (!Files.exists(mockDir)) {
+ Files.createDirectories(mockDir);
+ }
+ UUID uuid = UUID.randomUUID();
+ File testFile = Files.createFile(Paths.get(dorisHome, "fe", "mocked",
"TimestampTest-" + uuid.toString())).toFile();
+ LOG.debug("createTmpFile path {}", testFile.getAbsolutePath());
+ testFiles.add(testFile.getAbsolutePath());
+ return testFile.getAbsolutePath();
+ }
+
+ @AfterAll
+ public static void cleanUp() throws Exception {
+ for (String testFile : testFiles) {
+ LOG.info("delete testFile path {}", testFile);
+ Files.deleteIfExists(Paths.get(testFile));
+ }
+ }
+
+ // @Test
+ @RepeatedTest(1)
+ public void testSerialization() throws Exception {
+ Timestamp timestamp = new Timestamp();
+ long ts = timestamp.getTimestamp();
+ Assertions.assertTrue(ts > 0);
+
+ File testFile = new File(createTestFile());
+ DataOutputStream out = new DataOutputStream(new
FileOutputStream(testFile));
+ timestamp.write(out);
+ out.flush();
+ out.close();
+
+ DataInputStream in = new DataInputStream(new
FileInputStream(testFile));
+ Thread.sleep(1000);
+ Timestamp timestamp2 = new Timestamp();
+ timestamp2.readFields(in);
+
+ Assertions.assertEquals(ts, timestamp2.getTimestamp());
+ Assertions.assertEquals("" + ts, timestamp2.toString());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]