This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch double_live
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/double_live by this push:
     new 04b36aa128 [To double_live] DoubleWrite for 0.13 (#5311)
04b36aa128 is described below

commit 04b36aa128fb3bd24dd712194148945673741c8b
Author: YongzaoDan <[email protected]>
AuthorDate: Tue Apr 12 12:45:34 2022 +0800

    [To double_live] DoubleWrite for 0.13 (#5311)
---
 docs/UserGuide/API/Programming-Java-Native-API.md  |   1 +
 .../UserGuide/API/Programming-Java-Native-API.md   |   3 +-
 example/doublewrite/pom.xml                        |  68 +++++++
 .../doublewrite/DoubleWriteDegradationRate.java    |  75 ++++++++
 .../iotdb/doublewrite/DoubleWriteDurability.java   |  64 ++++++
 .../apache/iotdb/doublewrite/DoubleWriteEmpty.java |  19 +-
 .../iotdb/doublewrite/DoubleWriteExample.java      |  76 ++++++++
 .../iotdb/doublewrite/DoubleWriteThread.java       |  98 ++++++++++
 .../apache/iotdb/doublewrite/DoubleWriteUtil.java  | 115 +++++++++++
 example/pom.xml                                    |   1 +
 .../java/org/apache/iotdb/SessionPoolExample.java  |  42 ++--
 integration/pom.xml                                |   7 +
 .../session/IoTDBSessionDisableMemControlIT.java   |   0
 .../session/IoTDBSessionVectorABDeviceIT.java      |   0
 .../session/IoTDBSessionVectorAggregationIT.java   |   0
 .../IoTDBSessionVectorAggregationWithUnSeqIT.java  |   0
 .../iotdb/session/IoTDBSessionVectorInsertIT.java  |   0
 .../apache/iotdb/session/SessionCacheLeaderUT.java |   0
 .../java/org/apache/iotdb/session/SessionTest.java |   0
 .../apache/iotdb/session/pool/SessionPoolTest.java |   0
 .../apache/iotdb/session/template/TemplateUT.java  |   0
 .../apache/iotdb/session/util/ThreadUtilsTest.java |   0
 server/pom.xml                                     |   5 +
 .../resources/conf/iotdb-engine.properties         |  57 ++++++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 124 ++++++++++++
 .../org/apache/iotdb/db/conf/IoTDBConstant.java    |   3 +
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  55 ++++++
 .../iotdb/db/doublewrite/DoubleWriteConsumer.java  | 103 ++++++++++
 .../db/doublewrite/DoubleWriteEProtector.java      |  81 ++++++++
 .../db/doublewrite/DoubleWriteLogService.java      | 214 +++++++++++++++++++++
 .../db/doublewrite/DoubleWriteNIProtector.java     |  54 ++++++
 .../db/doublewrite/DoubleWritePlanTypeUtils.java   |  54 ++++++
 .../iotdb/db/doublewrite/DoubleWriteProducer.java  |  54 ++++++
 .../iotdb/db/doublewrite/DoubleWriteProtector.java | 169 ++++++++++++++++
 .../iotdb/db/doublewrite/DoubleWriteTask.java      |  85 ++++++++
 .../db/service/thrift/impl/TSServiceImpl.java      | 165 +++++++++++++++-
 .../db/doublewrite/DoubleWriteManualTestUtils.java | 107 +++++++++++
 session/pom.xml                                    |  13 --
 .../java/org/apache/iotdb/session/Session.java     |  86 ++++++---
 .../apache/iotdb/session/SessionConnection.java    |  20 ++
 .../org/apache/iotdb/session/pool/SessionPool.java | 201 +++++++++++++++++--
 thrift/src/main/thrift/rpc.thrift                  |   8 +
 42 files changed, 2147 insertions(+), 80 deletions(-)

diff --git a/docs/UserGuide/API/Programming-Java-Native-API.md 
b/docs/UserGuide/API/Programming-Java-Native-API.md
index 90b4c1e4a0..260ef3c76a 100644
--- a/docs/UserGuide/API/Programming-Java-Native-API.md
+++ b/docs/UserGuide/API/Programming-Java-Native-API.md
@@ -514,6 +514,7 @@ If you can not get a session connection in 60 seconds, 
there is a warning log bu
 If a session has finished an operation, it will be put back to the pool 
automatically.
 If a session connection is broken, the session will be removed automatically 
and the pool will try 
 to create a new session and redo the operation.
+You can also specify an url list of multiple reachable nodes when creating a 
SessionPool, just as you would when creating a Session. To ensure high 
availability of clients in distributed cluster.
 
 For query operations:
 
diff --git a/docs/zh/UserGuide/API/Programming-Java-Native-API.md 
b/docs/zh/UserGuide/API/Programming-Java-Native-API.md
index 5edd930d1f..1f7e378504 100644
--- a/docs/zh/UserGuide/API/Programming-Java-Native-API.md
+++ b/docs/zh/UserGuide/API/Programming-Java-Native-API.md
@@ -498,7 +498,8 @@ void testInsertTablets(Map<String, Tablet> tablets)
 如果超过 60s 都没得到一个连接的话,那么会打印一条警告日志,但是程序仍将继续等待。
 
 当一个连接被用完后,他会自动返回池中等待下次被使用;
-当一个连接损坏后,他会从池中被删除,并重建一个连接重新执行用户的操作。
+当一个连接损坏后,他会从池中被删除,并重建一个连接重新执行用户的操作;
+你还可以像创建 Session 那样在创建 SessionPool 时指定多个可连接节点的 url,以保证分布式集群中客户端的高可用性。
 
 对于查询操作:
 
diff --git a/example/doublewrite/pom.xml b/example/doublewrite/pom.xml
new file mode 100644
index 0000000000..238f43224f
--- /dev/null
+++ b/example/doublewrite/pom.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>iotdb-examples</artifactId>
+        <groupId>org.apache.iotdb</groupId>
+        <version>0.13.1-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>doublewrite-example</artifactId>
+    <name>double-write-example</name>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-server</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-session</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-cli</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-jdbc</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-server</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+    </dependencies>
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+    </properties>
+</project>
diff --git 
a/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteDegradationRate.java
 
b/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteDegradationRate.java
new file mode 100644
index 0000000000..df3e195cff
--- /dev/null
+++ 
b/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteDegradationRate.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.doublewrite;
+
+/**
+ * This is a double write insertion degradation rate test java class, which 
shows the performance
+ * impact when enable double write feature. You can run this code in the same 
way as
+ * DoubleWriteExample.java. Since IoTDB-A enable the double write feature, the 
performance impact is
+ * correct only when Both IoTDB-A and IoTDB-B run on the same computer. Or you 
can modify the
+ * default configuration of IoTDB-A and IoTDB-B after becoming familiar with 
DoubleWriteExample,
+ * DoubleWriteUtil to get A more accurate performance impact estimate from two 
remote computers.
+ */
+public class DoubleWriteDegradationRate extends DoubleWriteUtil {
+
+  private static final String dA = "d0";
+  private static final String dB = "d1";
+
+  /**
+   * The following three fields are insert configuration parameters. The 
double write feature
+   * already applies to all write interfaces, so you are free to modify these 
parameters.
+   */
+  // Total insertion requests during test
+  private static final int batchCnt = 100000;
+  // The insertion timeseries count per timestamp
+  private static final int timeseriesCnt = 100;
+  // The insertion rows count per request
+  private static final int batchSize = 1;
+
+  public static void main(String[] args) throws Exception {
+    initEnvironment();
+    initSessionPool(dA, dB, batchCnt, timeseriesCnt, batchSize);
+    insertData();
+    cleanEnvironment();
+  }
+
+  private static void insertData() throws InterruptedException {
+    // do insertion while measuring performance
+    long startTime = System.currentTimeMillis();
+    threadA.start();
+    threadA.join();
+    double doubleWriteCost = System.currentTimeMillis() - startTime;
+
+    startTime = System.currentTimeMillis();
+    threadB.start();
+    threadB.join();
+    double normalWriteCost = System.currentTimeMillis() - startTime;
+
+    // compute performance
+    double total = batchCnt * batchSize;
+    System.out.println("Normal write cost: " + normalWriteCost / 1000.0 + "s");
+    System.out.println("Average: " + normalWriteCost / total + " ms per 
insertion");
+    System.out.println("Double write cost: " + doubleWriteCost / 1000.0 + "s");
+    System.out.println("Average: " + doubleWriteCost / total + " ms per 
insertion");
+    System.out.println(
+        "Performance degradation rate : "
+            + (doubleWriteCost - normalWriteCost) / normalWriteCost * 100.0
+            + "%");
+  }
+}
diff --git 
a/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteDurability.java
 
b/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteDurability.java
new file mode 100644
index 0000000000..5cfafb4a83
--- /dev/null
+++ 
b/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteDurability.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.doublewrite;
+
+import org.apache.iotdb.session.pool.SessionPool;
+
+public class DoubleWriteDurability extends DoubleWriteUtil {
+
+  private static final String dA = "d0";
+  private static final String dB = "d1";
+
+  private static final int batchCnt = 3000;
+  private static final int timeseriesCnt = 1000;
+  private static final int batchSize = 1;
+
+  public static void main(String[] args) throws Exception {
+    initEnvironment();
+
+    sessionPoolA = new SessionPool(ipA, portA, userA, passwordA, concurrency);
+    // Create StorageGroups
+    try {
+      sessionPoolA.deleteStorageGroup(sg);
+    } catch (Exception ignored) {
+      // ignored
+    }
+    sessionPoolA.setStorageGroup(sg);
+
+    // Create double write threads
+    DoubleWriteThread doubleWriteThreadA =
+        new DoubleWriteThread(sessionPoolA, dA, batchCnt, timeseriesCnt, 
batchSize);
+    threadA = new Thread(doubleWriteThreadA);
+    DoubleWriteThread doubleWriteThreadB =
+        new DoubleWriteThread(sessionPoolA, dB, batchCnt, timeseriesCnt, 
batchSize);
+    threadB = new Thread(doubleWriteThreadB);
+
+    insertData();
+  }
+
+  private static void insertData() throws InterruptedException {
+    threadA.start();
+    threadB.start();
+
+    threadA.join();
+    threadB.join();
+
+    System.out.println("Insertion complete.");
+  }
+}
diff --git 
a/session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java 
b/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteEmpty.java
similarity index 62%
copy from 
session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
copy to 
example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteEmpty.java
index 202e60b89b..73ff3a2fee 100644
--- a/session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
+++ 
b/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteEmpty.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,20 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.iotdb.doublewrite;
 
-package org.apache.iotdb.session.util;
+public class DoubleWriteEmpty extends DoubleWriteUtil {
 
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.concurrent.ThreadFactory;
-
-public class ThreadUtilsTest {
-
-  @Test
-  public void createThreadFactory() {
-    ThreadFactory daemonThreadFactory = 
ThreadUtils.createThreadFactory("Test", true);
-    Thread thread = daemonThreadFactory.newThread(() -> {});
-    Assert.assertEquals("Test-0", thread.getName());
+  public static void main(String[] args) {
+    initEnvironment();
   }
 }
diff --git 
a/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteExample.java
 
b/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteExample.java
new file mode 100644
index 0000000000..fc363b1015
--- /dev/null
+++ 
b/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteExample.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.doublewrite;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.pool.SessionDataSetWrapper;
+
+/**
+ * This is a simple double write example java class, which shows how to enable 
double write feature
+ * and proves that the double write feature performs correctly. To run this 
code, you need to start
+ * another IoTDB service. The easiest way to do this is to copy the
+ * ./server/target/iotdb-server-0.12.5-SNAPSHOT folder to any location. Then 
change the default port
+ * in iotdb-engine.properties to 6668, and start the service using the 
start-server script. Or you
+ * can start IoTDB-B on another computer and modify the configuration of 
IoTDB-B in DoubleWriteUtil.
+ * Finally, you can run this code and see double write feature from the 
command line.
+ */
+public class DoubleWriteExample extends DoubleWriteUtil {
+
+  private static final String dA = "d0";
+  private static final String dB = "d0";
+
+  private static final int batchCnt = 10;
+  private static final int timeseriesCnt = 1;
+  private static final int batchSize = 1;
+
+  private static final String sql = "select * from root.DOUBLEWRITESG.d0";
+
+  public static void main(String[] args) throws Exception {
+    initEnvironment();
+    initSessionPool(dA, dB, batchCnt, timeseriesCnt, batchSize);
+    insertData();
+    queryData();
+    cleanEnvironment();
+  }
+
+  private static void insertData() throws InterruptedException {
+    threadA.start();
+    threadB.start();
+
+    threadA.join();
+    threadB.join();
+  }
+
+  private static void queryData() throws IoTDBConnectionException, 
StatementExecutionException {
+    System.out.println("Data in IoTDB-A:");
+    // select data from IoTDB-A
+    SessionDataSetWrapper wrapper = sessionPoolA.executeQueryStatement(sql);
+    while (wrapper.hasNext()) {
+      System.out.println(wrapper.next());
+    }
+
+    // select data from IoTDB-B
+    System.out.println("Data in IoTDB-B:");
+    wrapper = sessionPoolB.executeQueryStatement(sql);
+    while (wrapper.hasNext()) {
+      System.out.println(wrapper.next());
+    }
+  }
+}
diff --git 
a/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteThread.java
 
b/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteThread.java
new file mode 100644
index 0000000000..9e9a2a49ed
--- /dev/null
+++ 
b/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteThread.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.doublewrite;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Thread for insertion. Normally you don't need to modify this java class. */
+public class DoubleWriteThread implements Runnable {
+
+  private final SessionPool sessionPool;
+
+  private final String sg = "root.DOUBLEWRITESG";
+  private final String deviceId;
+
+  private final int batchCnt;
+  private final int timeseriesCnt;
+  private final int batchSize;
+
+  DoubleWriteThread(
+      SessionPool sessionPool, String deviceId, int batchCnt, int 
timeseriesCnt, int batchSize)
+      throws IoTDBConnectionException, StatementExecutionException {
+    this.sessionPool = sessionPool;
+
+    this.deviceId = deviceId;
+
+    this.batchCnt = batchCnt;
+    this.timeseriesCnt = timeseriesCnt;
+    this.batchSize = batchSize;
+
+    for (int i = 0; i < timeseriesCnt; i++) {
+      sessionPool.createTimeseries(
+          sg + "." + deviceId + "." + "s" + i,
+          TSDataType.INT32,
+          TSEncoding.PLAIN,
+          CompressionType.UNCOMPRESSED);
+    }
+  }
+
+  @Override
+  public void run() {
+    long timestamp = 0;
+    for (int i = 0; i < batchCnt; i++) {
+      List<String> deviceList = new ArrayList<>();
+      List<Long> timestampList = new ArrayList<>();
+      List<List<String>> measurementList = new ArrayList<>();
+      List<List<String>> valueList = new ArrayList<>();
+
+      for (int j = 0; j < batchSize; j++) {
+        deviceList.add(sg + "." + deviceId);
+        timestampList.add(timestamp);
+        List<String> measurements = new ArrayList<>();
+        List<String> values = new ArrayList<>();
+        for (int k = 0; k < timeseriesCnt; k++) {
+          measurements.add("s" + k);
+          values.add(String.valueOf(timestamp));
+        }
+        measurementList.add(measurements);
+        valueList.add(values);
+        timestamp += 1;
+      }
+
+      try {
+        if (batchSize == 1) {
+          sessionPool.insertRecord(
+              deviceList.get(0), timestampList.get(0), measurementList.get(0), 
valueList.get(0));
+        } else {
+          sessionPool.insertRecords(deviceList, timestampList, 
measurementList, valueList);
+        }
+      } catch (IoTDBConnectionException | StatementExecutionException ignored) 
{
+        // ignored
+      }
+    }
+  }
+}
diff --git 
a/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteUtil.java
 
b/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteUtil.java
new file mode 100644
index 0000000000..be65248a32
--- /dev/null
+++ 
b/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteUtil.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.doublewrite;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.pool.SessionPool;
+
+/**
+ * This java class is used to create the double write examples environment. 
You can set IoTDB-B
+ * config here
+ */
+public abstract class DoubleWriteUtil {
+
+  // IoTDB-A config
+  // Started by EnvironmentUtils, shouldn't be modified
+  protected static final String ipA = "127.0.0.1";
+  protected static final int portA = 6667;
+  protected static final String userA = "root";
+  protected static final String passwordA = "root";
+
+  // IoTDB-B config
+  // You can modify that config in order to connect with IoTDB-B you started
+  protected static final String ipB = "127.0.0.1";
+  protected static final int portB = 6668;
+  protected static final String userB = "root";
+  protected static final String passwordB = "root";
+
+  protected static SessionPool sessionPoolA;
+  protected static SessionPool sessionPoolB;
+  // The sessionPool concurrency
+  protected static final int concurrency = 5;
+
+  // Default name of StorageGroup
+  protected static final String sg = "root.DOUBLEWRITESG";
+
+  // Threads for double write
+  protected static Thread threadA;
+  protected static Thread threadB;
+
+  protected static void initEnvironment() {
+    // Start local IoTDB-A on ip "127.0.0.1", port 6667 and set 
enableDoubleWrite
+    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+    config.setEnableDoubleWrite(true);
+    config.setSecondaryAddress(ipB);
+    config.setSecondaryPort(portB);
+    config.setSecondaryUser(userB);
+    config.setSecondaryPassword(passwordB);
+    config.setDoubleWriteMaxLogSize(1024);
+
+    EnvironmentUtils.envSetUp();
+  }
+
+  protected static void initSessionPool(
+      String dA, String dB, int batchCnt, int timeseriesCnt, int batchSize)
+      throws IoTDBConnectionException, StatementExecutionException {
+    // Create sessionPools
+    sessionPoolA = new SessionPool(ipA, portA, userA, passwordA, concurrency);
+    sessionPoolB = new SessionPool(ipB, portB, userB, passwordB, concurrency);
+
+    // Create StorageGroups
+    try {
+      sessionPoolA.deleteStorageGroup(sg);
+    } catch (Exception ignored) {
+      // ignored
+    }
+    try {
+      sessionPoolB.deleteStorageGroup(sg);
+    } catch (Exception ignored) {
+      // ignored
+    }
+    sessionPoolA.setStorageGroup(sg);
+    sessionPoolB.setStorageGroup(sg);
+
+    // Create double write threads
+    DoubleWriteThread doubleWriteThreadA =
+        new DoubleWriteThread(sessionPoolA, dA, batchCnt, timeseriesCnt, 
batchSize);
+    threadA = new Thread(doubleWriteThreadA);
+    DoubleWriteThread doubleWriteThreadB =
+        new DoubleWriteThread(sessionPoolB, dB, batchCnt, timeseriesCnt, 
batchSize);
+    threadB = new Thread(doubleWriteThreadB);
+  }
+
+  protected static void cleanEnvironment() throws Exception {
+    // Clean StorageGroups, close sessionPools and shut down environment
+    sessionPoolA.deleteStorageGroup("root.DOUBLEWRITESG");
+    sessionPoolB.deleteStorageGroup("root.DOUBLEWRITESG");
+
+    sessionPoolA.close();
+    sessionPoolB.close();
+
+    EnvironmentUtils.cleanEnv();
+    EnvironmentUtils.shutdownDaemon();
+  }
+}
diff --git a/example/pom.xml b/example/pom.xml
index e099ceeb1f..aced1b0801 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -47,6 +47,7 @@
         <module>udf</module>
         <module>trigger</module>
         <module>rabbitmq</module>
+        <module>doublewrite</module>
     </modules>
     <build>
         <pluginManagement>
diff --git 
a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java 
b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
index 230849d25d..23a1895c22 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
@@ -32,12 +32,12 @@ import java.util.concurrent.Executors;
 
 public class SessionPoolExample {
 
-  private static SessionPool pool;
+  private static SessionPool sessionPool;
   private static ExecutorService service;
 
-  public static void main(String[] args)
-      throws StatementExecutionException, IoTDBConnectionException, 
InterruptedException {
-    pool =
+  /** Build a custom SessionPool for this example */
+  private static void constructCustomSessionPool() {
+    sessionPool =
         new SessionPool.Builder()
             .host("127.0.0.1")
             .port(6667)
@@ -45,13 +45,33 @@ public class SessionPoolExample {
             .password("root")
             .maxSize(3)
             .build();
-    service = Executors.newFixedThreadPool(10);
+  }
 
+  /** Build a redirect-able SessionPool for this example */
+  private static void constructRedirectSessionPool() {
+    List<String> nodeUrls = new ArrayList<>();
+    nodeUrls.add("127.0.0.1:6667");
+    nodeUrls.add("127.0.0.1:6668");
+    sessionPool =
+        new SessionPool.Builder()
+            .nodeUrls(nodeUrls)
+            .user("root")
+            .password("root")
+            .maxSize(3)
+            .build();
+  }
+
+  public static void main(String[] args)
+      throws StatementExecutionException, IoTDBConnectionException, 
InterruptedException {
+    // Choose the SessionPool you going to use
+    constructRedirectSessionPool();
+
+    service = Executors.newFixedThreadPool(10);
     insertRecord();
     queryByRowRecord();
     Thread.sleep(1000);
     queryByIterator();
-    pool.close();
+    sessionPool.close();
     service.shutdown();
   }
 
@@ -72,7 +92,7 @@ public class SessionPoolExample {
       values.add(1L);
       values.add(2L);
       values.add(3L);
-      pool.insertRecord(deviceId, time, measurements, types, values);
+      sessionPool.insertRecord(deviceId, time, measurements, types, values);
     }
   }
 
@@ -82,7 +102,7 @@ public class SessionPoolExample {
           () -> {
             SessionDataSetWrapper wrapper = null;
             try {
-              wrapper = pool.executeQueryStatement("select * from 
root.sg1.d1");
+              wrapper = sessionPool.executeQueryStatement("select * from 
root.sg1.d1");
               System.out.println(wrapper.getColumnNames());
               System.out.println(wrapper.getColumnTypes());
               while (wrapper.hasNext()) {
@@ -92,7 +112,7 @@ public class SessionPoolExample {
               e.printStackTrace();
             } finally {
               // remember to close data set finally!
-              pool.closeResultSet(wrapper);
+              sessionPool.closeResultSet(wrapper);
             }
           });
     }
@@ -104,7 +124,7 @@ public class SessionPoolExample {
           () -> {
             SessionDataSetWrapper wrapper = null;
             try {
-              wrapper = pool.executeQueryStatement("select * from 
root.sg1.d1");
+              wrapper = sessionPool.executeQueryStatement("select * from 
root.sg1.d1");
               // get DataIterator like JDBC
               DataIterator dataIterator = wrapper.iterator();
               System.out.println(wrapper.getColumnNames());
@@ -120,7 +140,7 @@ public class SessionPoolExample {
               e.printStackTrace();
             } finally {
               // remember to close data set finally!
-              pool.closeResultSet(wrapper);
+              sessionPool.closeResultSet(wrapper);
             }
           });
     }
diff --git a/integration/pom.xml b/integration/pom.xml
index f014163b7c..4ddd7e8c66 100644
--- a/integration/pom.xml
+++ b/integration/pom.xml
@@ -34,6 +34,13 @@
             <artifactId>iotdb-server</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-server</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.iotdb</groupId>
             <artifactId>iotdb-cluster</artifactId>
diff --git 
a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionDisableMemControlIT.java
 
b/integration/src/test/java/org/apache/iotdb/session/IoTDBSessionDisableMemControlIT.java
similarity index 100%
rename from 
session/src/test/java/org/apache/iotdb/session/IoTDBSessionDisableMemControlIT.java
rename to 
integration/src/test/java/org/apache/iotdb/session/IoTDBSessionDisableMemControlIT.java
diff --git 
a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorABDeviceIT.java
 
b/integration/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorABDeviceIT.java
similarity index 100%
rename from 
session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorABDeviceIT.java
rename to 
integration/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorABDeviceIT.java
diff --git 
a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
 
b/integration/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
similarity index 100%
rename from 
session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
rename to 
integration/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
diff --git 
a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationWithUnSeqIT.java
 
b/integration/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationWithUnSeqIT.java
similarity index 100%
rename from 
session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationWithUnSeqIT.java
rename to 
integration/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationWithUnSeqIT.java
diff --git 
a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
 
b/integration/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
similarity index 100%
rename from 
session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
rename to 
integration/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
diff --git 
a/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java 
b/integration/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
similarity index 100%
rename from 
session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
rename to 
integration/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
diff --git a/session/src/test/java/org/apache/iotdb/session/SessionTest.java 
b/integration/src/test/java/org/apache/iotdb/session/SessionTest.java
similarity index 100%
rename from session/src/test/java/org/apache/iotdb/session/SessionTest.java
rename to integration/src/test/java/org/apache/iotdb/session/SessionTest.java
diff --git 
a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java 
b/integration/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
similarity index 100%
rename from 
session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
rename to 
integration/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
diff --git 
a/session/src/test/java/org/apache/iotdb/session/template/TemplateUT.java 
b/integration/src/test/java/org/apache/iotdb/session/template/TemplateUT.java
similarity index 100%
rename from 
session/src/test/java/org/apache/iotdb/session/template/TemplateUT.java
rename to 
integration/src/test/java/org/apache/iotdb/session/template/TemplateUT.java
diff --git 
a/session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java 
b/integration/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
similarity index 100%
rename from 
session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
rename to 
integration/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
diff --git a/server/pom.xml b/server/pom.xml
index 0cfec2bd3e..4d58bcdaf8 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -45,6 +45,11 @@
             <artifactId>iotdb-antlr</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-session</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.iotdb</groupId>
             <artifactId>tsfile</artifactId>
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties 
b/server/src/assembly/resources/conf/iotdb-engine.properties
index a4a02484ee..ae448c09b4 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -926,3 +926,60 @@ timestamp_precision=ms
 ####################
 # Datatype: float
 # group_by_fill_cache_size_in_mb=1.0
+
+####################
+### Double Write Configuration
+####################
+
+# Is DoubleWrite enable
+# Datatype: boolean
+# enable_double_write=false
+
+# Secondary IoTDB config
+# Datatype: String
+# secondary_address=127.0.0.1
+
+# Datatype: int
+# secondary_port=6668
+
+# Datatype: String
+# secondary_user=root
+
+# Datatype: String
+# secondary_password=root
+
+# The transmitting concurrency size of DoubleWrite SessionPool
+# This parameter should better not exceed the sum of concurrent 
DoubleWriteTask or DoubleWriteConsumer and concurrent DoubleWriteProtector
+# Datatype: int
+# double_write_session_concurrency_size = 8
+
+# DoubleWriteLog dir
+# If this property is unset, DoubleWrite module will save the DoubleWriteLogs 
in the default relative path directory under the IoTDB folder(i.e., 
%IOTDB_HOME%/data/doublewrite).
+# If it is absolute, DoubleWrite module will save the data in exact location 
it points to.
+# If it is relative, DoubleWrite module will save the data in the relative 
path directory it indicates under the IoTDB folder.
+# For windows platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is 
"\\\\", then the path is absolute. Otherwise, it is relative.
+# double_write_log_dir=data\\doublewrite
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+# double_write_log_dir=data/doublewrite
+
+# The validity of each DoubleWriteLog
+# Datatype: int
+# double_write_log_file_validity=30
+
+# The maximum id of DoubleWriteLog
+# Datatype: int
+# double_write_log_file_num = 32767
+
+# The max size of all the DoubleWriteLog. Default is 100GB
+# Datatype: long
+# double_write_max_log_size = 107374182400
+
+# DoubleWriteProducer cache size
+# Datatype: int
+# double_write_producer_cache_size=1024
+
+# DoubleWriteConsumer concurrency size
+# Datatype: int
+# double_write_consumer_concurrency_size=4
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index fb195d3542..654bda0be1 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -814,6 +814,33 @@ public class IoTDBConfig {
   /** Encryption provided class parameter */
   private String encryptDecryptProviderParameter;
 
+  // DoubleWrite Config
+  private boolean enableDoubleWrite = false;
+
+  // Secondary IoTDB
+  private String secondaryAddress = "127.0.0.1";
+  private int secondaryPort = 6668;
+  private String secondaryUser = "root";
+  private String secondaryPassword = "root";
+
+  // The transmitting concurrency size of double write SessionPool
+  private int doubleWriteSessionConcurrencySize = 8;
+
+  // DoubleWriteLog dir
+  private String doubleWriteLogDir =
+      DEFAULT_BASE_DIR + File.separator + 
IoTDBConstant.DOUBLEWRITE_FOLDER_NAME;
+  // The validity of each DoubleWriteLog
+  private int doubleWriteLogValidity = 30;
+  // The maximum id of DoubleWriteLog
+  private int doubleWriteLogNum = 32767;
+  // The max size of all the DoubleWriteLog. Default is 100GB
+  private long doubleWriteMaxLogSize = 107374182400L;
+
+  // DoubleWrite InsertPlan cache size
+  private int doubleWriteProducerCacheSize = 1024;
+  // DoubleWriteConsumer concurrency size
+  private int doubleWriteConsumerConcurrencySize = 4;
+
   public IoTDBConfig() {
     // empty constructor
   }
@@ -947,6 +974,7 @@ public class IoTDBConfig {
     extDir = addHomeDir(extDir);
     udfDir = addHomeDir(udfDir);
     triggerDir = addHomeDir(triggerDir);
+    doubleWriteLogDir = addHomeDir(doubleWriteLogDir);
 
     if 
(TSFileDescriptor.getInstance().getConfig().getTSFileStorageFs().equals(FSType.HDFS))
 {
       String hdfsDir = getHdfsDir();
@@ -2551,4 +2579,100 @@ public class IoTDBConfig {
   public void setEncryptDecryptProviderParameter(String 
encryptDecryptProviderParameter) {
     this.encryptDecryptProviderParameter = encryptDecryptProviderParameter;
   }
+
+  public boolean isEnableDoubleWrite() {
+    return enableDoubleWrite;
+  }
+
+  public void setEnableDoubleWrite(boolean enableDoubleWrite) {
+    this.enableDoubleWrite = enableDoubleWrite;
+  }
+
+  public String getSecondaryAddress() {
+    return secondaryAddress;
+  }
+
+  public void setSecondaryAddress(String secondaryAddress) {
+    this.secondaryAddress = secondaryAddress;
+  }
+
+  public int getSecondaryPort() {
+    return secondaryPort;
+  }
+
+  public void setSecondaryPort(int secondaryPort) {
+    this.secondaryPort = secondaryPort;
+  }
+
+  public String getSecondaryUser() {
+    return secondaryUser;
+  }
+
+  public void setSecondaryUser(String secondaryUser) {
+    this.secondaryUser = secondaryUser;
+  }
+
+  public String getSecondaryPassword() {
+    return secondaryPassword;
+  }
+
+  public void setSecondaryPassword(String secondaryPassword) {
+    this.secondaryPassword = secondaryPassword;
+  }
+
+  public int getDoubleWriteSessionConcurrencySize() {
+    return doubleWriteSessionConcurrencySize;
+  }
+
+  public void setDoubleWriteSessionConcurrencySize(int 
doubleWriteSessionConcurrencySize) {
+    this.doubleWriteSessionConcurrencySize = doubleWriteSessionConcurrencySize;
+  }
+
+  public String getDoubleWriteLogDir() {
+    return doubleWriteLogDir;
+  }
+
+  public void setDoubleWriteLogDir(String doubleWriteLogDir) {
+    this.doubleWriteLogDir = doubleWriteLogDir;
+  }
+
+  public int getDoubleWriteLogValidity() {
+    return doubleWriteLogValidity;
+  }
+
+  public void setDoubleWriteLogValidity(int doubleWriteLogValidity) {
+    this.doubleWriteLogValidity = doubleWriteLogValidity;
+  }
+
+  public int getDoubleWriteLogNum() {
+    return doubleWriteLogNum;
+  }
+
+  public void setDoubleWriteLogNum(int doubleWriteLogNum) {
+    this.doubleWriteLogNum = doubleWriteLogNum;
+  }
+
+  public long getDoubleWriteMaxLogSize() {
+    return doubleWriteMaxLogSize;
+  }
+
+  public void setDoubleWriteMaxLogSize(long doubleWriteMaxLogSize) {
+    this.doubleWriteMaxLogSize = doubleWriteMaxLogSize;
+  }
+
+  public int getDoubleWriteProducerCacheSize() {
+    return doubleWriteProducerCacheSize;
+  }
+
+  public void setDoubleWriteProducerCacheSize(int 
doubleWriteProducerCacheSize) {
+    this.doubleWriteProducerCacheSize = doubleWriteProducerCacheSize;
+  }
+
+  public int getDoubleWriteConsumerConcurrencySize() {
+    return doubleWriteConsumerConcurrencySize;
+  }
+
+  public void setDoubleWriteConsumerConcurrencySize(int 
doubleWriteConsumerConcurrencySize) {
+    this.doubleWriteConsumerConcurrencySize = 
doubleWriteConsumerConcurrencySize;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index feab1abd37..848b35e4d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -154,6 +154,9 @@ public class IoTDBConstant {
   public static final String UDF_FOLDER_NAME = "udf";
   public static final String TRIGGER_FOLDER_NAME = "trigger";
 
+  // doublewrite folder name
+  public static final String DOUBLEWRITE_FOLDER_NAME = "doublewrite";
+
   // mqtt
   public static final String ENABLE_MQTT = "enable_mqtt_service";
   public static final String MQTT_HOST_NAME = "mqtt_host";
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 3167a54f7e..40f9a9bc3c 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -783,6 +783,61 @@ public class IoTDBDescriptor {
               "iotdb_server_encrypt_decrypt_provider_parameter",
               conf.getEncryptDecryptProviderParameter()));
 
+      // set DoubleWrite config
+      conf.setEnableDoubleWrite(
+          Boolean.parseBoolean(
+              properties.getProperty(
+                  "enable_double_write", 
String.valueOf(conf.isEnableDoubleWrite()))));
+
+      conf.setSecondaryAddress(
+          properties.getProperty("secondary_address", 
conf.getSecondaryAddress()));
+
+      conf.setSecondaryPort(
+          Integer.parseInt(
+              properties.getProperty("secondary_port", 
String.valueOf(conf.getSecondaryPort()))));
+
+      conf.setSecondaryUser(properties.getProperty("secondary_user", 
conf.getSecondaryUser()));
+
+      conf.setSecondaryPassword(
+          properties.getProperty("secondary_password", 
conf.getSecondaryPassword()));
+
+      conf.setDoubleWriteSessionConcurrencySize(
+          Integer.parseInt(
+              properties.getProperty(
+                  "double_write_session_concurrency_size",
+                  
String.valueOf(conf.getDoubleWriteSessionConcurrencySize()))));
+
+      conf.setDoubleWriteLogDir(
+          properties.getProperty("double_write_log_dir", 
conf.getDoubleWriteLogDir()));
+
+      conf.setDoubleWriteLogValidity(
+          Integer.parseInt(
+              properties.getProperty(
+                  "double_write_log_file_validity",
+                  String.valueOf(conf.getDoubleWriteLogValidity()))));
+
+      conf.setDoubleWriteLogNum(
+          Integer.parseInt(
+              properties.getProperty(
+                  "double_write_log_file_num", 
String.valueOf(conf.getDoubleWriteLogNum()))));
+
+      conf.setDoubleWriteMaxLogSize(
+          Long.parseLong(
+              properties.getProperty(
+                  "double_write_max_log_size", 
String.valueOf(conf.getDoubleWriteMaxLogSize()))));
+
+      conf.setDoubleWriteProducerCacheSize(
+          Integer.parseInt(
+              properties.getProperty(
+                  "double_write_producer_cache_size",
+                  String.valueOf(conf.getDoubleWriteProducerCacheSize()))));
+
+      conf.setDoubleWriteConsumerConcurrencySize(
+          Integer.parseInt(
+              properties.getProperty(
+                  "double_write_consumer_concurrency_size",
+                  
String.valueOf(conf.getDoubleWriteConsumerConcurrencySize()))));
+
       // At the same time, set TSFileConfig
       TSFileDescriptor.getInstance()
           .getConfig()
diff --git 
a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java 
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java
new file mode 100644
index 0000000000..2d0a3da959
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublewrite;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class DoubleWriteConsumer implements Runnable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DoubleWriteConsumer.class);
+
+  private final BlockingQueue<Pair<ByteBuffer, 
DoubleWritePlanTypeUtils.DoubleWritePlanType>>
+      doubleWriteQueue;
+  private final SessionPool doubleWriteSessionPool;
+  private final DoubleWriteLogService niLogService;
+
+  public DoubleWriteConsumer(
+      BlockingQueue<Pair<ByteBuffer, 
DoubleWritePlanTypeUtils.DoubleWritePlanType>>
+          doubleWriteQueue,
+      SessionPool doubleWriteSessionPool,
+      DoubleWriteLogService niLogService) {
+    this.doubleWriteQueue = doubleWriteQueue;
+    this.doubleWriteSessionPool = doubleWriteSessionPool;
+    this.niLogService = niLogService;
+  }
+
+  @Override
+  public void run() {
+    while (true) {
+      Pair<ByteBuffer, DoubleWritePlanTypeUtils.DoubleWritePlanType> head;
+      ByteBuffer headBuffer;
+      DoubleWritePlanTypeUtils.DoubleWritePlanType headType;
+      try {
+        head = doubleWriteQueue.take();
+        headBuffer = head.left;
+        headType = head.right;
+      } catch (InterruptedException e) {
+        LOGGER.error("DoubleWriteConsumer been interrupted: ", e);
+        continue;
+      }
+
+      if (headType == DoubleWritePlanTypeUtils.DoubleWritePlanType.IPlan) {
+        try {
+          // Sleep 10ms when it's I-Plan
+          TimeUnit.MILLISECONDS.sleep(10);
+        } catch (InterruptedException ignore) {
+          // Ignored
+        }
+      }
+
+      headBuffer.position(0);
+      boolean transmitStatus = false;
+      try {
+        headBuffer.position(0);
+        transmitStatus = 
doubleWriteSessionPool.doubleWriteTransmit(headBuffer);
+      } catch (IoTDBConnectionException connectionException) {
+        // warn IoTDBConnectionException and do serialization
+        LOGGER.warn(
+            "DoubleWriteConsumer can't transmit because network failure", 
connectionException);
+      } catch (Exception e) {
+        // The PhysicalPlan has internal error, reject transmit
+        LOGGER.error("DoubleWriteConsumer can't transmit", e);
+        continue;
+      }
+
+      if (!transmitStatus) {
+        try {
+          // must set buffer position to limit() before serialization
+          headBuffer.position(headBuffer.limit());
+          niLogService.acquireLogWriter();
+          niLogService.write(headBuffer);
+        } catch (IOException e) {
+          LOGGER.error("DoubleWriteConsumer can't serialize physicalPlan", e);
+        }
+        niLogService.releaseLogWriter();
+      }
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteEProtector.java
 
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteEProtector.java
new file mode 100644
index 0000000000..81840d6eee
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteEProtector.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublewrite;
+
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.pool.SessionPool;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+public class DoubleWriteEProtector extends DoubleWriteProtector {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DoubleWriteEProtector.class);
+
+  private final SessionPool doubleWriteSessionPool;
+
+  public DoubleWriteEProtector(SessionPool doubleWriteSessionPool) {
+    super();
+    this.doubleWriteSessionPool = doubleWriteSessionPool;
+  }
+
+  @Override
+  protected void preCheck() {
+    // do nothing
+  }
+
+  @Override
+  protected void transmitPhysicalPlan(ByteBuffer planBuffer, PhysicalPlan 
physicalPlan) {
+    while (true) {
+      // transmit E-Plan until it's been received
+      boolean transmitStatus = false;
+
+      try {
+        // try double write
+        planBuffer.position(0);
+        transmitStatus = 
doubleWriteSessionPool.doubleWriteTransmit(planBuffer);
+      } catch (IoTDBConnectionException connectionException) {
+        // warn IoTDBConnectionException and retry
+        LOGGER.warn("DoubleWriteEProtector can't transmit, retrying...", 
connectionException);
+      } catch (Exception e) {
+        // error exception and break
+        LOGGER.error("DoubleWriteEProtector can't transmit", e);
+        break;
+      }
+
+      if (transmitStatus) {
+        break;
+      } else {
+        try {
+          TimeUnit.SECONDS.sleep(1);
+        } catch (InterruptedException e) {
+          LOGGER.warn("DoubleWriteEProtector is interrupted", e);
+        }
+      }
+    }
+  }
+
+  public boolean isAtWork() {
+    return isProtectorAtWork;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteLogService.java
 
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteLogService.java
new file mode 100644
index 0000000000..70df59f663
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteLogService.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublewrite;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.writelog.io.LogWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class DoubleWriteLogService implements Runnable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DoubleWriteLogService.class);
+
+  private static final String logFileDir =
+      IoTDBDescriptor.getInstance().getConfig().getDoubleWriteLogDir();
+  private static final long logFileValidity =
+      IoTDBDescriptor.getInstance().getConfig().getDoubleWriteLogValidity() * 
1000L;
+  private static final int maxLogFileNum =
+      IoTDBDescriptor.getInstance().getConfig().getDoubleWriteLogNum();
+  private static final long maxLogFileSize =
+      IoTDBDescriptor.getInstance().getConfig().getDoubleWriteMaxLogSize();
+
+  private static long currentLogFileSize = 0;
+
+  private final DoubleWriteProtector protector;
+  private final Lock logWriterLock;
+  private final String logFileName;
+  private int logFileID;
+  private long logFileCreateTime;
+  private File logFile;
+  private LogWriter logWriter;
+
+  public DoubleWriteLogService(String logFileName, DoubleWriteProtector 
protector) {
+    this.logFileName = logFileName;
+    this.protector = protector;
+
+    this.logWriterLock = new ReentrantLock();
+    this.logFile = null;
+    this.logWriter = null;
+
+    File logDir = new File(logFileDir);
+    if (!logDir.exists()) {
+      if (!logDir.mkdirs()) {
+        LOGGER.error("Can't make DoubleWriteLog file dir: {}", 
logDir.getAbsolutePath());
+      }
+    }
+  }
+
+  @Override
+  public void run() {
+    // Check if there exists remnant logs
+    List<Integer> logFileIDList = new ArrayList<>();
+    for (int ID = 0; ID < maxLogFileNum; ID++) {
+      File file =
+          SystemFileFactory.INSTANCE.getFile(logFileDir + File.separator + 
logFileName + ID);
+      if (file.exists()) {
+        logFileIDList.add(ID);
+      }
+    }
+
+    int firstID = 0;
+    if (logFileIDList.size() > 0) {
+      // Re-transmit the remnant logs
+      for (int i = 0; i < logFileIDList.size() - 1; i++) {
+        if (logFileIDList.get(i + 1) - logFileIDList.get(i) > 1) {
+          firstID = i + 1;
+          break;
+        }
+      }
+
+      for (int i = firstID; i < logFileIDList.size(); i++) {
+        protector.registerLogFile(logFileDir + File.separator + logFileName + 
logFileIDList.get(i));
+      }
+      for (int i = 0; i < firstID; i++) {
+        protector.registerLogFile(logFileDir + File.separator + logFileName + 
logFileIDList.get(i));
+      }
+
+      int nextID;
+      if (firstID == 0) {
+        nextID = logFileIDList.get(logFileIDList.size() - 1) + 1;
+      } else {
+        nextID = logFileIDList.get(firstID - 1) + 1;
+      }
+      logFileID = nextID % maxLogFileNum;
+    } else {
+      logFileID = 0;
+    }
+
+    while (true) {
+      // Check the validity of logFile
+      logWriterLock.lock();
+      if (logWriter != null && System.currentTimeMillis() - logFileCreateTime 
> logFileValidity) {
+        // Submit logFile when it's expired
+        submitLogFile();
+      }
+      logWriterLock.unlock();
+
+      try {
+        // Sleep 10s before next check
+        TimeUnit.SECONDS.sleep(10);
+      } catch (InterruptedException e) {
+        LOGGER.error("DoubleWriteLogService been interrupted", e);
+      }
+    }
+  }
+
+  private void submitLogFile() {
+    try {
+      logWriter.force();
+    } catch (IOException e) {
+      LOGGER.error("Can't force logWrite", e);
+    }
+    incLogFileSize(logFile.length());
+
+    for (int retry = 0; retry < 5; retry++) {
+      try {
+        logWriter.close();
+      } catch (IOException e) {
+        LOGGER.warn("Can't close DoubleWriteLog: {}, retrying...", 
logFile.getAbsolutePath());
+        try {
+          // Sleep 1s and retry
+          TimeUnit.SECONDS.sleep(1);
+        } catch (InterruptedException ignored) {
+          // Ignore and retry
+        }
+        continue;
+      }
+
+      LOGGER.info("DoubleWriteLog: {} is expired and closed", 
logFile.getAbsolutePath());
+      break;
+    }
+
+    protector.registerLogFile(
+        logFileDir
+            + File.separator
+            + logFileName
+            + (logFileID - 1 + maxLogFileNum) % maxLogFileNum);
+
+    logWriter = null;
+    logFile = null;
+  }
+
+  private void createLogFile() {
+    logFile =
+        SystemFileFactory.INSTANCE.getFile(logFileDir + File.separator + 
logFileName + logFileID);
+    while (true) {
+      try {
+        if (logFile.createNewFile()) {
+          logFileCreateTime = System.currentTimeMillis();
+          logWriter = new LogWriter(logFile, false);
+          LOGGER.info("Create DoubleWriteLog: {}", logFile.getAbsolutePath());
+          break;
+        }
+      } catch (IOException e) {
+        LOGGER.warn("Can't create DoubleWriteLog: {}, retrying...", 
logFile.getAbsolutePath());
+        try {
+          TimeUnit.SECONDS.sleep(1);
+        } catch (InterruptedException ignored) {
+          // Ignore and retry
+        }
+      }
+    }
+    logFileID = (logFileID + 1) % maxLogFileNum;
+  }
+
+  public static synchronized void incLogFileSize(long size) {
+    currentLogFileSize = currentLogFileSize + size;
+  }
+
+  public void acquireLogWriter() {
+    logWriterLock.lock();
+  }
+
+  public void write(ByteBuffer buffer) throws IOException {
+    if (currentLogFileSize < maxLogFileSize) {
+      if (logWriter == null) {
+        // Create logFile when there are no valid
+        createLogFile();
+      }
+      logWriter.write(buffer);
+    }
+  }
+
+  public void releaseLogWriter() {
+    logWriterLock.unlock();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteNIProtector.java
 
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteNIProtector.java
new file mode 100644
index 0000000000..c6c0fd09e8
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteNIProtector.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublewrite;
+
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+public class DoubleWriteNIProtector extends DoubleWriteProtector {
+
+  private final DoubleWriteEProtector eProtector;
+  private final DoubleWriteProducer producer;
+
+  public DoubleWriteNIProtector(DoubleWriteEProtector eProtector, 
DoubleWriteProducer producer) {
+    super();
+    this.eProtector = eProtector;
+    this.producer = producer;
+  }
+
+  @Override
+  protected void preCheck() {
+    while (eProtector.isAtWork()) {
+      try {
+        TimeUnit.SECONDS.sleep(5);
+      } catch (InterruptedException ignore) {
+        // ignore and retry
+      }
+    }
+  }
+
+  @Override
+  protected void transmitPhysicalPlan(ByteBuffer planBuffer, PhysicalPlan 
physicalPlan) {
+    producer.put(
+        new Pair<>(planBuffer, 
DoubleWritePlanTypeUtils.getDoubleWritePlanType(physicalPlan)));
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWritePlanTypeUtils.java
 
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWritePlanTypeUtils.java
new file mode 100644
index 0000000000..3573d5f973
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWritePlanTypeUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublewrite;
+
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+
+public class DoubleWritePlanTypeUtils {
+
+  public static DoubleWritePlanType getDoubleWritePlanType(PhysicalPlan plan) {
+    if (plan instanceof SetStorageGroupPlan
+        || plan instanceof DeleteStorageGroupPlan
+        || plan instanceof CreateTimeSeriesPlan
+        || plan instanceof CreateMultiTimeSeriesPlan
+        || plan instanceof CreateAlignedTimeSeriesPlan
+        || plan instanceof DeleteTimeSeriesPlan) {
+      return DoubleWritePlanType.EPlan;
+    } else if (plan instanceof DeletePlan) {
+      return DoubleWritePlanType.IPlan;
+    } else if (plan instanceof InsertPlan) {
+      return DoubleWritePlanType.NPlan;
+    }
+    return null;
+  }
+
+  public enum DoubleWritePlanType {
+    EPlan,
+    IPlan,
+    NPlan
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java 
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java
new file mode 100644
index 0000000000..e024487ba2
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublewrite;
+
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * DoubleWriteProducer using BlockingQueue to cache PhysicalPlan. And persist 
some PhysicalPlan when
+ * they are too many to transmit
+ */
+public class DoubleWriteProducer {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DoubleWriteProducer.class);
+
+  private final BlockingQueue<Pair<ByteBuffer, 
DoubleWritePlanTypeUtils.DoubleWritePlanType>>
+      doubleWriteQueue;
+
+  public DoubleWriteProducer(
+      BlockingQueue<Pair<ByteBuffer, 
DoubleWritePlanTypeUtils.DoubleWritePlanType>>
+          doubleWriteQueue) {
+    this.doubleWriteQueue = doubleWriteQueue;
+  }
+
+  public void put(Pair<ByteBuffer, 
DoubleWritePlanTypeUtils.DoubleWritePlanType> planPair) {
+    try {
+      planPair.left.position(0);
+      doubleWriteQueue.put(planPair);
+    } catch (InterruptedException e) {
+      LOGGER.error("double write cache failed.", e);
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProtector.java
 
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProtector.java
new file mode 100644
index 0000000000..6eb00355c4
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProtector.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublewrite;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.writelog.io.SingleFileLogReader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public abstract class DoubleWriteProtector implements Runnable {
+
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(DoubleWriteProtector.class);
+  protected static final int logFileValidity =
+      IoTDBDescriptor.getInstance().getConfig().getDoubleWriteLogValidity();
+
+  // For transmit log files
+  protected final Lock logFileListLock;
+  protected List<String> registeredLogFiles;
+  protected List<String> processingLogFiles;
+
+  // For serialize PhysicalPlan
+  private static final int MAX_PHYSICALPLAN_SIZE = 16 * 1024 * 1024;
+  protected final ByteArrayOutputStream protectorByteStream;
+  protected final DataOutputStream protectorDeserializeStream;
+
+  // Working state
+  protected volatile boolean isProtectorAtWork;
+
+  protected DoubleWriteProtector() {
+    logFileListLock = new ReentrantLock();
+    registeredLogFiles = new ArrayList<>();
+
+    protectorByteStream = new ByteArrayOutputStream(MAX_PHYSICALPLAN_SIZE);
+    protectorDeserializeStream = new DataOutputStream(protectorByteStream);
+
+    isProtectorAtWork = false;
+  }
+
+  protected void registerLogFile(String logFile) {
+    logFileListLock.lock();
+    registeredLogFiles.add(logFile);
+    logFileListLock.unlock();
+  }
+
+  protected void wrapLogFiles() {
+    processingLogFiles = new ArrayList<>(registeredLogFiles);
+    registeredLogFiles = new ArrayList<>();
+  }
+
+  @Override
+  public void run() {
+    while (true) {
+      while (true) {
+        // Wrap and transmit all DoubleWriteLogs
+        logFileListLock.lock();
+        if (registeredLogFiles.size() > 0) {
+          isProtectorAtWork = true;
+          wrapLogFiles();
+          logFileListLock.unlock();
+        } else {
+          isProtectorAtWork = false;
+          logFileListLock.unlock();
+          break;
+        }
+        if (isProtectorAtWork) {
+          transmitLogFiles();
+        }
+      }
+
+      try {
+        // Sleep a while before next check
+        TimeUnit.SECONDS.sleep(logFileValidity);
+      } catch (InterruptedException e) {
+        LOGGER.warn("DoubleWriteProtector been interrupted", e);
+      }
+    }
+  }
+
+  protected void transmitLogFiles() {
+    preCheck();
+    for (String logFileName : processingLogFiles) {
+      File logFile = SystemFileFactory.INSTANCE.getFile(logFileName);
+      SingleFileLogReader logReader;
+      try {
+        logReader = new SingleFileLogReader(logFile);
+      } catch (FileNotFoundException e) {
+        LOGGER.error(
+            "DoubleWriteProtector can't open DoubleWriteLog: {}, discarded",
+            logFile.getAbsolutePath(),
+            e);
+        continue;
+      }
+
+      while (logReader.hasNext()) {
+        // read and re-serialize the PhysicalPlan
+        PhysicalPlan nextPlan = logReader.next();
+        try {
+          nextPlan.serialize(protectorDeserializeStream);
+        } catch (IOException e) {
+          LOGGER.error("DoubleWriteProtector can't serialize PhysicalPlan", e);
+          continue;
+        }
+        ByteBuffer nextBuffer = 
ByteBuffer.wrap(protectorByteStream.toByteArray());
+        protectorByteStream.reset();
+        transmitPhysicalPlan(nextBuffer, nextPlan);
+      }
+
+      logReader.close();
+      try {
+        // sleep one second then delete DoubleWriteLog
+        TimeUnit.SECONDS.sleep(1);
+      } catch (InterruptedException e) {
+        LOGGER.warn("DoubleWriteProtector is interrupted", e);
+      }
+
+      DoubleWriteLogService.incLogFileSize(-logFile.length());
+
+      boolean deleted = false;
+      for (int retryCnt = 0; retryCnt < 5; retryCnt++) {
+        if (logFile.delete()) {
+          deleted = true;
+          LOGGER.info("DoubleWriteLog: {} is deleted.", 
logFile.getAbsolutePath());
+          break;
+        } else {
+          LOGGER.warn("Delete DoubleWriteLog: {} failed. Retrying", 
logFile.getAbsolutePath());
+        }
+      }
+      if (!deleted) {
+        DoubleWriteLogService.incLogFileSize(logFile.length());
+        LOGGER.error("Couldn't delete DoubleWriteLog: {}", 
logFile.getAbsolutePath());
+      }
+    }
+  }
+
+  protected abstract void preCheck();
+
+  protected abstract void transmitPhysicalPlan(ByteBuffer planBuffer, 
PhysicalPlan physicalPlan);
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteTask.java 
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteTask.java
new file mode 100644
index 0000000000..3bcef6bf58
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteTask.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublewrite;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.pool.SessionPool;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/** DoubleWriteTask is used for transmit one E-Plan sending by a client */
+public class DoubleWriteTask implements Runnable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DoubleWriteTask.class);
+
+  private final ByteBuffer physicalPlanBuffer;
+  private final SessionPool doubleWriteSessionPool;
+  private final DoubleWriteEProtector eProtector;
+  private final DoubleWriteLogService eLogService;
+
+  public DoubleWriteTask(
+      ByteBuffer physicalPlanBuffer,
+      SessionPool doubleWriteSessionPool,
+      DoubleWriteEProtector eProtector,
+      DoubleWriteLogService eLogService) {
+    this.physicalPlanBuffer = physicalPlanBuffer;
+    this.doubleWriteSessionPool = doubleWriteSessionPool;
+    this.eProtector = eProtector;
+    this.eLogService = eLogService;
+  }
+
+  @Override
+  public void run() {
+    if (eProtector.isAtWork()) {
+      serializeEPlan();
+    } else {
+      boolean transmitStatus = false;
+      try {
+        physicalPlanBuffer.position(0);
+        transmitStatus = 
doubleWriteSessionPool.doubleWriteTransmit(physicalPlanBuffer);
+      } catch (IoTDBConnectionException connectionException) {
+        // warn IoTDBConnectionException and do serialization
+        LOGGER.warn("DoubleWriteTask can't transmit because network failure", 
connectionException);
+      } catch (Exception e) {
+        // The PhysicalPlan has internal error, reject transmit
+        LOGGER.error("DoubleWriteTask can't transmit", e);
+        return;
+      }
+      if (!transmitStatus) {
+        serializeEPlan();
+      }
+    }
+  }
+
+  private void serializeEPlan() {
+    // serialize the E-Plan if necessary
+    try {
+      // must set buffer position to limit() before serialization
+      physicalPlanBuffer.position(physicalPlanBuffer.limit());
+      eLogService.acquireLogWriter();
+      eLogService.write(physicalPlanBuffer);
+    } catch (IOException e) {
+      LOGGER.error("can't serialize current PhysicalPlan", e);
+    }
+    eLogService.releaseLogWriter();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 3f9dce62b3..80ffcbb14f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -25,6 +25,13 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.doublewrite.DoubleWriteConsumer;
+import org.apache.iotdb.db.doublewrite.DoubleWriteEProtector;
+import org.apache.iotdb.db.doublewrite.DoubleWriteLogService;
+import org.apache.iotdb.db.doublewrite.DoubleWriteNIProtector;
+import org.apache.iotdb.db.doublewrite.DoubleWritePlanTypeUtils;
+import org.apache.iotdb.db.doublewrite.DoubleWriteProducer;
+import org.apache.iotdb.db.doublewrite.DoubleWriteTask;
 import org.apache.iotdb.db.engine.selectinto.InsertTabletPlansIterator;
 import org.apache.iotdb.db.exception.IoTDBException;
 import org.apache.iotdb.db.exception.QueryInBatchStatementException;
@@ -106,6 +113,7 @@ import 
org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInternalSyncWriteReq;
 import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
 import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
@@ -120,6 +128,7 @@ import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.service.rpc.thrift.TSTracingInfo;
 import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
+import org.apache.iotdb.session.pool.SessionPool;
 import 
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -127,11 +136,14 @@ import 
org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.sql.SQLException;
@@ -142,6 +154,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
@@ -302,9 +316,61 @@ public class TSServiceImpl implements TSIService.Iface {
 
   protected final ServiceProvider serviceProvider;
 
+  /* Double write module */
+  private static final boolean isEnableDoubleWrite =
+      IoTDBDescriptor.getInstance().getConfig().isEnableDoubleWrite();
+  private final SessionPool doubleWriteSessionPool;
+  private final DoubleWriteProducer doubleWriteProducer;
+  private final DoubleWriteEProtector doubleWriteEProtector;
+  private final DoubleWriteLogService doubleWriteELogService;
+
   public TSServiceImpl() {
     super();
     serviceProvider = IoTDB.serviceProvider;
+
+    if (isEnableDoubleWrite) {
+      /* Open double write */
+      IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+      // create SessionPool for double write
+      doubleWriteSessionPool =
+          new SessionPool(
+              config.getSecondaryAddress(),
+              config.getSecondaryPort(),
+              config.getSecondaryUser(),
+              config.getSecondaryPassword(),
+              5);
+
+      // create DoubleWriteEProtector and DoubleWriteELogService
+      doubleWriteEProtector = new 
DoubleWriteEProtector(doubleWriteSessionPool);
+      new Thread(doubleWriteEProtector).start();
+      doubleWriteELogService = new DoubleWriteLogService("ELog", 
doubleWriteEProtector);
+      new Thread(doubleWriteELogService).start();
+
+      // create DoubleWriteProducer
+      BlockingQueue<Pair<ByteBuffer, 
DoubleWritePlanTypeUtils.DoubleWritePlanType>> blockingQueue =
+          new ArrayBlockingQueue<>(config.getDoubleWriteProducerCacheSize());
+      doubleWriteProducer = new DoubleWriteProducer(blockingQueue);
+
+      // create DoubleWriteNIProtector and DoubleWriteNILogService
+      DoubleWriteNIProtector doubleWriteNIProtector =
+          new DoubleWriteNIProtector(doubleWriteEProtector, 
doubleWriteProducer);
+      new Thread(doubleWriteNIProtector).start();
+      DoubleWriteLogService doubleWriteNILogService =
+          new DoubleWriteLogService("NILog", doubleWriteNIProtector);
+      new Thread(doubleWriteNILogService).start();
+
+      // create DoubleWriteConsumer
+      for (int i = 0; i < config.getDoubleWriteConsumerConcurrencySize(); i++) 
{
+        DoubleWriteConsumer consumer =
+            new DoubleWriteConsumer(blockingQueue, doubleWriteSessionPool, 
doubleWriteNILogService);
+        new Thread(consumer).start();
+      }
+    } else {
+      doubleWriteSessionPool = null;
+      doubleWriteProducer = null;
+      doubleWriteEProtector = null;
+      doubleWriteELogService = null;
+    }
   }
 
   @Override
@@ -1484,7 +1550,12 @@ public class TSServiceImpl implements TSIService.Iface {
               req.values,
               req.isAligned);
       TSStatus status = serviceProvider.checkAuthority(plan, 
req.getSessionId());
-      return status != null ? status : executeNonQueryPlan(plan);
+
+      if (status != null) {
+        return status;
+      }
+
+      return executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.INSERT_RECORD, 
e.getErrorCode());
     } catch (Exception e) {
@@ -1515,7 +1586,12 @@ public class TSServiceImpl implements TSIService.Iface {
       plan.setNeedInferType(true);
       plan.setAligned(req.isAligned);
       TSStatus status = serviceProvider.checkAuthority(plan, 
req.getSessionId());
-      return status != null ? status : executeNonQueryPlan(plan);
+
+      if (status != null) {
+        return status;
+      }
+
+      return executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.INSERT_STRING_RECORD, 
e.getErrorCode());
     } catch (Exception e) {
@@ -1571,7 +1647,11 @@ public class TSServiceImpl implements TSIService.Iface {
       insertTabletPlan.setAligned(req.isAligned);
       TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, 
req.getSessionId());
 
-      return status != null ? status : executeNonQueryPlan(insertTabletPlan);
+      if (status != null) {
+        return status;
+      }
+
+      return executeNonQueryPlan(insertTabletPlan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.INSERT_TABLET, 
e.getErrorCode());
     } catch (Exception e) {
@@ -2057,7 +2137,86 @@ public class TSServiceImpl implements TSIService.Iface {
     return status != null ? status : executeNonQueryPlan(plan);
   }
 
+  @Override
+  public TSStatus executeDoubleWrite(TSInternalSyncWriteReq req) {
+    PhysicalPlan physicalPlan = null;
+    try {
+      ByteBuffer planBuffer = req.physicalPlan;
+      planBuffer.position(0);
+      physicalPlan = PhysicalPlan.Factory.create(req.physicalPlan);
+    } catch (IllegalPathException | IOException e) {
+      LOGGER.error("double write deserialization failed.", e);
+    }
+
+    DoubleWritePlanTypeUtils.DoubleWritePlanType planType =
+        DoubleWritePlanTypeUtils.getDoubleWritePlanType(physicalPlan);
+    if (planType == null) {
+      LOGGER.error(
+          "DoubleWrite receive unsupported PhysicalPlan type: {}", 
physicalPlan.getOperatorName());
+      return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    }
+    // LOGGER.info("DoubleWrite receive:{}", 
physicalPlan.getPaths().toString());
+
+    try {
+      return serviceProvider.executeNonQuery(physicalPlan)
+          ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute 
successfully")
+          : RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } catch (Exception e) {
+      return onNonQueryException(e, OperationType.EXECUTE_NON_QUERY_PLAN);
+    }
+  }
+
+  private void transmitDoubleWrite(PhysicalPlan physicalPlan) {
+
+    DoubleWritePlanTypeUtils.DoubleWritePlanType planType =
+        DoubleWritePlanTypeUtils.getDoubleWritePlanType(physicalPlan);
+    if (planType == null) {
+      // Don't need DoubleWrite
+      return;
+    }
+
+    // LOGGER.info("DoubleWrite transmit: {}", 
physicalPlan.getPaths().toString());
+
+    // serialize physical plan
+    ByteBuffer buffer;
+    try {
+      int size = physicalPlan.getSerializedSize();
+      ByteArrayOutputStream doubleWriteByteStream = new 
ByteArrayOutputStream(size);
+      DataOutputStream doubleWriteSerializeStream = new 
DataOutputStream(doubleWriteByteStream);
+      physicalPlan.serialize(doubleWriteSerializeStream);
+      buffer = ByteBuffer.wrap(doubleWriteByteStream.toByteArray());
+    } catch (IOException e) {
+      LOGGER.error("DoubleWrite can't serialize PhysicalPlan", e);
+      return;
+    }
+
+    switch (planType) {
+      case EPlan:
+        // Create DoubleWriteTask and wait
+        Thread taskThread =
+            new Thread(
+                new DoubleWriteTask(
+                    buffer, doubleWriteSessionPool, doubleWriteEProtector, 
doubleWriteELogService));
+        taskThread.start();
+        try {
+          taskThread.join();
+        } catch (InterruptedException e) {
+          LOGGER.error("DoubleWriteTask been interrupted", e);
+        }
+        break;
+      case IPlan:
+      case NPlan:
+        // Put into DoubleWriteProducer
+        doubleWriteProducer.put(new Pair<>(buffer, planType));
+    }
+  }
+
   protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
+    if (isEnableDoubleWrite) {
+      // DoubleWrite should transmit before execute
+      transmitDoubleWrite(plan);
+    }
+
     try {
       return serviceProvider.executeNonQuery(plan)
           ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute 
successfully")
diff --git 
a/server/src/test/java/org/apache/iotdb/db/doublewrite/DoubleWriteManualTestUtils.java
 
b/server/src/test/java/org/apache/iotdb/db/doublewrite/DoubleWriteManualTestUtils.java
new file mode 100644
index 0000000000..d7cb8d9b62
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/doublewrite/DoubleWriteManualTestUtils.java
@@ -0,0 +1,107 @@
+package org.apache.iotdb.db.doublewrite;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class DoubleWriteManualTestUtils {
+
+  private static final SessionPool sessionPool =
+      new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+
+  private static final String sg = "root.sg";
+  private static final int sgCnt = 10;
+  private static final String d = ".d";
+  private static final int dCnt = 20;
+  private static final String s = ".s";
+  private static final int sCnt = 100;
+  private static final int dataCnt = 1000;
+
+  public void setStorageGroups() throws IoTDBConnectionException, 
StatementExecutionException {
+    for (int i = 0; i < sgCnt; i++) {
+      sessionPool.setStorageGroup(sg + i);
+    }
+  }
+
+  public void deleteStorageGroups() throws IoTDBConnectionException, 
StatementExecutionException {
+    for (int i = 0; i < sgCnt; i++) {
+      sessionPool.deleteStorageGroups(Collections.singletonList(sg + i));
+    }
+  }
+
+  public void createTimeSeries() throws IoTDBConnectionException, 
StatementExecutionException {
+    for (int i = 0; i < sgCnt; i++) {
+      String SG = sg + i;
+      for (int j = 0; j < dCnt; j++) {
+        String D = d + j;
+        for (int k = 0; k < sCnt; k++) {
+          String S = s + k;
+          sessionPool.createTimeseries(
+              SG + D + S, TSDataType.INT32, TSEncoding.PLAIN, 
CompressionType.UNCOMPRESSED);
+        }
+      }
+    }
+  }
+
+  public void deleteTimeSeries() throws IoTDBConnectionException, 
StatementExecutionException {
+    for (int i = 0; i < sgCnt; i++) {
+      String SG = sg + i;
+      for (int j = 0; j < dCnt; j++) {
+        String D = d + j;
+        for (int k = 0; k < sCnt; k++) {
+          String S = s + k;
+          sessionPool.deleteTimeseries(SG + D + S);
+        }
+      }
+    }
+  }
+
+  public void insertData() throws IoTDBConnectionException, 
StatementExecutionException {
+    long startTime = System.currentTimeMillis();
+    for (int i = 0; i < sgCnt; i++) {
+      String SG = sg + i;
+      for (int j = 0; j < dCnt; j++) {
+        String D = d + j;
+        String device = SG + D;
+        List<String> measurements = new ArrayList<>();
+        List<TSDataType> types = new ArrayList<>();
+        for (int k = 0; k < sCnt; k++) {
+          measurements.add("s" + k);
+          types.add(TSDataType.INT32);
+        }
+        for (int l = 0; l < dataCnt; l++) {
+          List<Object> values = new ArrayList<>();
+          for (int k = 0; k < sCnt; k++) {
+            values.add(l);
+          }
+          sessionPool.insertRecord(device, l, measurements, types, values);
+        }
+      }
+    }
+    long endTime = System.currentTimeMillis();
+    System.out.println(
+        "Avg time per insert: "
+            + ((endTime - startTime) / (double) (sgCnt + dCnt + dataCnt))
+            + "ms");
+  }
+
+  public void deleteData() throws IoTDBConnectionException, 
StatementExecutionException {
+    for (int i = 0; i < sgCnt; i++) {
+      String SG = sg + i;
+      for (int j = 0; j < dCnt; j++) {
+        String D = d + j;
+        for (int k = 0; k < sCnt; k++) {
+          String S = s + k;
+          sessionPool.deleteData(Collections.singletonList(SG + D + S), 0, 
dataCnt);
+        }
+      }
+    }
+  }
+}
diff --git a/session/pom.xml b/session/pom.xml
index 634ad7c685..7072a7c426 100644
--- a/session/pom.xml
+++ b/session/pom.xml
@@ -71,19 +71,6 @@
             <artifactId>service-rpc</artifactId>
             <version>${project.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.iotdb</groupId>
-            <artifactId>iotdb-server</artifactId>
-            <version>${project.version}</version>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.iotdb</groupId>
-            <artifactId>iotdb-server</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>org.apache.iotdb</groupId>
             <artifactId>iotdb-jdbc</artifactId>
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java 
b/session/src/main/java/org/apache/iotdb/session/Session.java
index 9a31211403..64555f584e 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -38,6 +38,7 @@ import 
org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInternalSyncWriteReq;
 import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
 import org.apache.iotdb.service.rpc.thrift.TSPruneSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateReq;
@@ -1022,8 +1023,11 @@ public class Session {
           genTSInsertStringRecordsReq(deviceIds, times, measurementsList, 
valuesList, false);
       try {
         defaultSessionConnection.insertRecords(request);
-      } catch (RedirectException ignored) {
-        // ignore
+      } catch (RedirectException e) {
+        Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
+        for (Map.Entry<String, EndPoint> deviceEndPointEntry : 
deviceEndPointMap.entrySet()) {
+          handleRedirection(deviceEndPointEntry.getKey(), 
deviceEndPointEntry.getValue());
+        }
       }
     }
   }
@@ -1056,8 +1060,11 @@ public class Session {
           genTSInsertStringRecordsReq(deviceIds, times, measurementsList, 
valuesList, true);
       try {
         defaultSessionConnection.insertRecords(request);
-      } catch (RedirectException ignored) {
-        // ignore
+      } catch (RedirectException e) {
+        Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
+        for (Map.Entry<String, EndPoint> deviceEndPointEntry : 
deviceEndPointMap.entrySet()) {
+          handleRedirection(deviceEndPointEntry.getKey(), 
deviceEndPointEntry.getValue());
+        }
       }
     }
   }
@@ -1139,8 +1146,11 @@ public class Session {
           genTSInsertRecordsReq(deviceIds, times, measurementsList, typesList, 
valuesList, false);
       try {
         defaultSessionConnection.insertRecords(request);
-      } catch (RedirectException ignored) {
-        // ignore
+      } catch (RedirectException e) {
+        Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
+        for (Map.Entry<String, EndPoint> deviceEndPointEntry : 
deviceEndPointMap.entrySet()) {
+          handleRedirection(deviceEndPointEntry.getKey(), 
deviceEndPointEntry.getValue());
+        }
       }
     }
   }
@@ -1174,8 +1184,11 @@ public class Session {
           genTSInsertRecordsReq(deviceIds, times, measurementsList, typesList, 
valuesList, true);
       try {
         defaultSessionConnection.insertRecords(request);
-      } catch (RedirectException ignored) {
-        // ignore
+      } catch (RedirectException e) {
+        Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
+        for (Map.Entry<String, EndPoint> deviceEndPointEntry : 
deviceEndPointMap.entrySet()) {
+          handleRedirection(deviceEndPointEntry.getKey(), 
deviceEndPointEntry.getValue());
+        }
       }
     }
   }
@@ -1657,8 +1670,11 @@ public class Session {
           genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted, 
false);
       try {
         defaultSessionConnection.insertTablets(request);
-      } catch (RedirectException ignored) {
-        // ignored
+      } catch (RedirectException e) {
+        Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
+        for (Map.Entry<String, EndPoint> deviceEndPointEntry : 
deviceEndPointMap.entrySet()) {
+          handleRedirection(deviceEndPointEntry.getKey(), 
deviceEndPointEntry.getValue());
+        }
       }
     }
   }
@@ -1692,8 +1708,11 @@ public class Session {
           genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted, 
true);
       try {
         defaultSessionConnection.insertTablets(request);
-      } catch (RedirectException ignored) {
-        // ignored
+      } catch (RedirectException e) {
+        Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
+        for (Map.Entry<String, EndPoint> deviceEndPointEntry : 
deviceEndPointMap.entrySet()) {
+          handleRedirection(deviceEndPointEntry.getKey(), 
deviceEndPointEntry.getValue());
+        }
       }
     }
   }
@@ -2442,6 +2461,24 @@ public class Session {
     }
   }
 
+  /** Transmit insert record request for double write */
+  public void doubleWriteTransmit(ByteBuffer buffer)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      TSInternalSyncWriteReq request = genTSExecuteDoubleWriteReq(buffer);
+      defaultSessionConnection.executeDoubleWrite(request);
+    } catch (RedirectException e) {
+      // ignored
+    }
+  }
+
+  private TSInternalSyncWriteReq genTSExecuteDoubleWriteReq(ByteBuffer buffer) 
{
+    TSInternalSyncWriteReq request = new TSInternalSyncWriteReq();
+    request.setDoubleWriteType((byte) 0);
+    request.setPhysicalPlan(buffer);
+    return request;
+  }
+
   public boolean isEnableQueryRedirection() {
     return enableQueryRedirection;
   }
@@ -2470,7 +2507,7 @@ public class Session {
     private boolean enableCacheLeader = Config.DEFAULT_CACHE_LEADER_MODE;
     private Version version = Config.DEFAULT_VERSION;
 
-    List<String> nodeUrls = null;
+    private List<String> nodeUrls = null;
 
     public Builder host(String host) {
       this.host = host;
@@ -2535,16 +2572,19 @@ public class Session {
       }
 
       if (nodeUrls != null) {
-        return new Session(
-            nodeUrls,
-            username,
-            password,
-            fetchSize,
-            zoneId,
-            thriftDefaultBufferSize,
-            thriftMaxFrameSize,
-            enableCacheLeader,
-            version);
+        Session newSession =
+            new Session(
+                nodeUrls,
+                username,
+                password,
+                fetchSize,
+                zoneId,
+                thriftDefaultBufferSize,
+                thriftMaxFrameSize,
+                enableCacheLeader,
+                version);
+        newSession.setEnableQueryRedirection(true);
+        return newSession;
       }
 
       return new Session(
diff --git 
a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java 
b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 930f5baaa1..9ec01d4c7c 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -44,6 +44,7 @@ import 
org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInternalSyncWriteReq;
 import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
 import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
@@ -942,6 +943,25 @@ public class SessionConnection {
     }
   }
 
+  protected void executeDoubleWrite(TSInternalSyncWriteReq request)
+      throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
+    request.setSessionId(sessionId);
+    try {
+      
RpcUtils.verifySuccessWithRedirection(client.executeDoubleWrite(request));
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          request.setSessionId(sessionId);
+          RpcUtils.verifySuccess(client.executeDoubleWrite(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+      }
+    }
+  }
+
   public boolean isEnableRedirect() {
     return enableRedirect;
   }
diff --git 
a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java 
b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 900f4185e4..ad8561bab5 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -33,6 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.time.ZoneId;
 import java.util.List;
 import java.util.Map;
@@ -93,6 +94,9 @@ public class SessionPool {
   // whether the queue is closed.
   private boolean closed;
 
+  // Redirect-able SessionPool
+  private final List<String> nodeUrls;
+
   public SessionPool(String host, int port, String user, String password, int 
maxSize) {
     this(
         host,
@@ -108,6 +112,20 @@ public class SessionPool {
         Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
+  public SessionPool(List<String> nodeUrls, String user, String password, int 
maxSize) {
+    this(
+        nodeUrls,
+        user,
+        password,
+        maxSize,
+        Config.DEFAULT_FETCH_SIZE,
+        60_000,
+        false,
+        null,
+        Config.DEFAULT_CACHE_LEADER_MODE,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
+  }
+
   public SessionPool(
       String host, int port, String user, String password, int maxSize, 
boolean enableCompression) {
     this(
@@ -124,6 +142,21 @@ public class SessionPool {
         Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
+  public SessionPool(
+      List<String> nodeUrls, String user, String password, int maxSize, 
boolean enableCompression) {
+    this(
+        nodeUrls,
+        user,
+        password,
+        maxSize,
+        Config.DEFAULT_FETCH_SIZE,
+        60_000,
+        enableCompression,
+        null,
+        Config.DEFAULT_CACHE_LEADER_MODE,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
+  }
+
   public SessionPool(
       String host,
       int port,
@@ -146,6 +179,26 @@ public class SessionPool {
         Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
+  public SessionPool(
+      List<String> nodeUrls,
+      String user,
+      String password,
+      int maxSize,
+      boolean enableCompression,
+      boolean enableCacheLeader) {
+    this(
+        nodeUrls,
+        user,
+        password,
+        maxSize,
+        Config.DEFAULT_FETCH_SIZE,
+        60_000,
+        enableCompression,
+        null,
+        enableCacheLeader,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
+  }
+
   public SessionPool(
       String host, int port, String user, String password, int maxSize, ZoneId 
zoneId) {
     this(
@@ -162,6 +215,21 @@ public class SessionPool {
         Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
+  public SessionPool(
+      List<String> nodeUrls, String user, String password, int maxSize, ZoneId 
zoneId) {
+    this(
+        nodeUrls,
+        user,
+        password,
+        maxSize,
+        Config.DEFAULT_FETCH_SIZE,
+        60_000,
+        false,
+        zoneId,
+        Config.DEFAULT_CACHE_LEADER_MODE,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
+  }
+
   @SuppressWarnings("squid:S107")
   public SessionPool(
       String host,
@@ -178,6 +246,7 @@ public class SessionPool {
     this.maxSize = maxSize;
     this.host = host;
     this.port = port;
+    this.nodeUrls = null;
     this.user = user;
     this.password = password;
     this.fetchSize = fetchSize;
@@ -188,6 +257,60 @@ public class SessionPool {
     this.connectionTimeoutInMs = connectionTimeoutInMs;
   }
 
+  public SessionPool(
+      List<String> nodeUrls,
+      String user,
+      String password,
+      int maxSize,
+      int fetchSize,
+      long waitToGetSessionTimeoutInMs,
+      boolean enableCompression,
+      ZoneId zoneId,
+      boolean enableCacheLeader,
+      int connectionTimeoutInMs) {
+    this.maxSize = maxSize;
+    this.host = null;
+    this.port = -1;
+    this.nodeUrls = nodeUrls;
+    this.user = user;
+    this.password = password;
+    this.fetchSize = fetchSize;
+    this.waitToGetSessionTimeoutInMs = waitToGetSessionTimeoutInMs;
+    this.enableCompression = enableCompression;
+    this.zoneId = zoneId;
+    this.enableCacheLeader = enableCacheLeader;
+    this.connectionTimeoutInMs = connectionTimeoutInMs;
+  }
+
+  private Session constructNewSession() {
+    Session session;
+    if (nodeUrls == null) {
+      // Construct custom Session
+      session =
+          new Session.Builder()
+              .host(host)
+              .port(port)
+              .username(user)
+              .password(password)
+              .fetchSize(fetchSize)
+              .zoneId(zoneId)
+              .enableCacheLeader(enableCacheLeader)
+              .build();
+    } else {
+      // Construct redirect-able Session
+      session =
+          new Session.Builder()
+              .nodeUrls(nodeUrls)
+              .username(user)
+              .password(password)
+              .fetchSize(fetchSize)
+              .zoneId(zoneId)
+              .enableCacheLeader(enableCacheLeader)
+              .build();
+    }
+    return session;
+  }
+
   // if this method throws an exception, either the server is broken, or the 
ip/port/user/password
   // is incorrect.
   @SuppressWarnings({"squid:S3776", "squid:S2446"}) // Suppress high Cognitive 
Complexity warning
@@ -254,9 +377,15 @@ public class SessionPool {
     if (shouldCreate) {
       // create a new one.
       if (logger.isDebugEnabled()) {
-        logger.debug("Create a new Session {}, {}, {}, {}", host, port, user, 
password);
+        if (nodeUrls == null) {
+          logger.debug("Create a new Session {}, {}, {}, {}", host, port, 
user, password);
+        } else {
+          logger.debug("Create a new redirect Session {}, {}, {}", nodeUrls, 
user, password);
+        }
       }
-      session = new Session(host, port, user, password, fetchSize, zoneId, 
enableCacheLeader);
+
+      session = constructNewSession();
+
       try {
         session.open(enableCompression, connectionTimeoutInMs);
         // avoid someone has called close() the session pool
@@ -352,7 +481,7 @@ public class SessionPool {
 
   @SuppressWarnings({"squid:S2446"})
   private void tryConstructNewSession() {
-    Session session = new Session(host, port, user, password, fetchSize, 
zoneId, enableCacheLeader);
+    Session session = constructNewSession();
     try {
       session.open(enableCompression, connectionTimeoutInMs);
       // avoid someone has called close() the session pool
@@ -2129,6 +2258,28 @@ public class SessionPool {
     return null;
   }
 
+  /** Transmit insert record request for double write */
+  public boolean doubleWriteTransmit(ByteBuffer buffer)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        buffer.position(0);
+        session.doubleWriteTransmit(buffer);
+        putBack(session);
+        return true;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+
+    return false;
+  }
+
   public int getMaxSize() {
     return maxSize;
   }
@@ -2177,6 +2328,7 @@ public class SessionPool {
 
     private String host = Config.DEFAULT_HOST;
     private int port = Config.DEFAULT_PORT;
+    private List<String> nodeUrls = null;
     private int maxSize = Config.DEFAULT_SESSION_POOL_MAX_SIZE;
     private String user = Config.DEFAULT_USER;
     private String password = Config.DEFAULT_PASSWORD;
@@ -2197,6 +2349,11 @@ public class SessionPool {
       return this;
     }
 
+    public Builder nodeUrls(List<String> nodeUrls) {
+      this.nodeUrls = nodeUrls;
+      return this;
+    }
+
     public Builder maxSize(int maxSize) {
       this.maxSize = maxSize;
       return this;
@@ -2243,18 +2400,32 @@ public class SessionPool {
     }
 
     public SessionPool build() {
-      return new SessionPool(
-          host,
-          port,
-          user,
-          password,
-          maxSize,
-          fetchSize,
-          waitToGetSessionTimeoutInMs,
-          enableCompression,
-          zoneId,
-          enableCacheLeader,
-          connectionTimeoutInMs);
+      if (nodeUrls == null) {
+        return new SessionPool(
+            host,
+            port,
+            user,
+            password,
+            maxSize,
+            fetchSize,
+            waitToGetSessionTimeoutInMs,
+            enableCompression,
+            zoneId,
+            enableCacheLeader,
+            connectionTimeoutInMs);
+      } else {
+        return new SessionPool(
+            nodeUrls,
+            user,
+            password,
+            maxSize,
+            fetchSize,
+            waitToGetSessionTimeoutInMs,
+            enableCompression,
+            zoneId,
+            enableCacheLeader,
+            connectionTimeoutInMs);
+      }
     }
   }
 }
diff --git a/thrift/src/main/thrift/rpc.thrift 
b/thrift/src/main/thrift/rpc.thrift
index 78ccce6db6..2fdeed1847 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -416,6 +416,12 @@ struct TSDropSchemaTemplateReq {
   2: required string templateName
 }
 
+struct TSInternalSyncWriteReq {
+  1: required i64 sessionId
+  2: required byte doubleWriteType
+  3: required binary physicalPlan
+}
+
 service TSIService {
   TSOpenSessionResp openSession(1:TSOpenSessionReq req);
 
@@ -506,4 +512,6 @@ service TSIService {
   TSStatus unsetSchemaTemplate(1:TSUnsetSchemaTemplateReq req);
 
   TSStatus dropSchemaTemplate(1:TSDropSchemaTemplateReq req);
+
+  TSStatus executeDoubleWrite(1:TSInternalSyncWriteReq req);
 }
\ No newline at end of file

Reply via email to