This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch double_live
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/double_live by this push:
new 04b36aa128 [To double_live] DoubleWrite for 0.13 (#5311)
04b36aa128 is described below
commit 04b36aa128fb3bd24dd712194148945673741c8b
Author: YongzaoDan <[email protected]>
AuthorDate: Tue Apr 12 12:45:34 2022 +0800
[To double_live] DoubleWrite for 0.13 (#5311)
---
docs/UserGuide/API/Programming-Java-Native-API.md | 1 +
.../UserGuide/API/Programming-Java-Native-API.md | 3 +-
example/doublewrite/pom.xml | 68 +++++++
.../doublewrite/DoubleWriteDegradationRate.java | 75 ++++++++
.../iotdb/doublewrite/DoubleWriteDurability.java | 64 ++++++
.../apache/iotdb/doublewrite/DoubleWriteEmpty.java | 19 +-
.../iotdb/doublewrite/DoubleWriteExample.java | 76 ++++++++
.../iotdb/doublewrite/DoubleWriteThread.java | 98 ++++++++++
.../apache/iotdb/doublewrite/DoubleWriteUtil.java | 115 +++++++++++
example/pom.xml | 1 +
.../java/org/apache/iotdb/SessionPoolExample.java | 42 ++--
integration/pom.xml | 7 +
.../session/IoTDBSessionDisableMemControlIT.java | 0
.../session/IoTDBSessionVectorABDeviceIT.java | 0
.../session/IoTDBSessionVectorAggregationIT.java | 0
.../IoTDBSessionVectorAggregationWithUnSeqIT.java | 0
.../iotdb/session/IoTDBSessionVectorInsertIT.java | 0
.../apache/iotdb/session/SessionCacheLeaderUT.java | 0
.../java/org/apache/iotdb/session/SessionTest.java | 0
.../apache/iotdb/session/pool/SessionPoolTest.java | 0
.../apache/iotdb/session/template/TemplateUT.java | 0
.../apache/iotdb/session/util/ThreadUtilsTest.java | 0
server/pom.xml | 5 +
.../resources/conf/iotdb-engine.properties | 57 ++++++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 124 ++++++++++++
.../org/apache/iotdb/db/conf/IoTDBConstant.java | 3 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 55 ++++++
.../iotdb/db/doublewrite/DoubleWriteConsumer.java | 103 ++++++++++
.../db/doublewrite/DoubleWriteEProtector.java | 81 ++++++++
.../db/doublewrite/DoubleWriteLogService.java | 214 +++++++++++++++++++++
.../db/doublewrite/DoubleWriteNIProtector.java | 54 ++++++
.../db/doublewrite/DoubleWritePlanTypeUtils.java | 54 ++++++
.../iotdb/db/doublewrite/DoubleWriteProducer.java | 54 ++++++
.../iotdb/db/doublewrite/DoubleWriteProtector.java | 169 ++++++++++++++++
.../iotdb/db/doublewrite/DoubleWriteTask.java | 85 ++++++++
.../db/service/thrift/impl/TSServiceImpl.java | 165 +++++++++++++++-
.../db/doublewrite/DoubleWriteManualTestUtils.java | 107 +++++++++++
session/pom.xml | 13 --
.../java/org/apache/iotdb/session/Session.java | 86 ++++++---
.../apache/iotdb/session/SessionConnection.java | 20 ++
.../org/apache/iotdb/session/pool/SessionPool.java | 201 +++++++++++++++++--
thrift/src/main/thrift/rpc.thrift | 8 +
42 files changed, 2147 insertions(+), 80 deletions(-)
diff --git a/docs/UserGuide/API/Programming-Java-Native-API.md
b/docs/UserGuide/API/Programming-Java-Native-API.md
index 90b4c1e4a0..260ef3c76a 100644
--- a/docs/UserGuide/API/Programming-Java-Native-API.md
+++ b/docs/UserGuide/API/Programming-Java-Native-API.md
@@ -514,6 +514,7 @@ If you can not get a session connection in 60 seconds,
there is a warning log bu
If a session has finished an operation, it will be put back to the pool
automatically.
If a session connection is broken, the session will be removed automatically
and the pool will try
to create a new session and redo the operation.
+You can also specify an url list of multiple reachable nodes when creating a
SessionPool, just as you would when creating a Session. To ensure high
availability of clients in distributed cluster.
For query operations:
diff --git a/docs/zh/UserGuide/API/Programming-Java-Native-API.md
b/docs/zh/UserGuide/API/Programming-Java-Native-API.md
index 5edd930d1f..1f7e378504 100644
--- a/docs/zh/UserGuide/API/Programming-Java-Native-API.md
+++ b/docs/zh/UserGuide/API/Programming-Java-Native-API.md
@@ -498,7 +498,8 @@ void testInsertTablets(Map<String, Tablet> tablets)
如果超过 60s 都没得到一个连接的话,那么会打印一条警告日志,但是程序仍将继续等待。
当一个连接被用完后,他会自动返回池中等待下次被使用;
-当一个连接损坏后,他会从池中被删除,并重建一个连接重新执行用户的操作。
+当一个连接损坏后,他会从池中被删除,并重建一个连接重新执行用户的操作;
+你还可以像创建 Session 那样在创建 SessionPool 时指定多个可连接节点的 url,以保证分布式集群中客户端的高可用性。
对于查询操作:
diff --git a/example/doublewrite/pom.xml b/example/doublewrite/pom.xml
new file mode 100644
index 0000000000..238f43224f
--- /dev/null
+++ b/example/doublewrite/pom.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>iotdb-examples</artifactId>
+ <groupId>org.apache.iotdb</groupId>
+ <version>0.13.1-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>doublewrite-example</artifactId>
+ <name>double-write-example</name>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-server</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-session</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-cli</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-jdbc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-server</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ </dependencies>
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ </properties>
+</project>
diff --git
a/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteDegradationRate.java
b/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteDegradationRate.java
new file mode 100644
index 0000000000..df3e195cff
--- /dev/null
+++
b/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteDegradationRate.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.doublewrite;
+
+/**
+ * This is a double write insertion degradation rate test java class, which
shows the performance
+ * impact when enable double write feature. You can run this code in the same
way as
+ * DoubleWriteExample.java. Since IoTDB-A enable the double write feature, the
performance impact is
+ * correct only when Both IoTDB-A and IoTDB-B run on the same computer. Or you
can modify the
+ * default configuration of IoTDB-A and IoTDB-B after becoming familiar with
DoubleWriteExample,
+ * DoubleWriteUtil to get A more accurate performance impact estimate from two
remote computers.
+ */
+public class DoubleWriteDegradationRate extends DoubleWriteUtil {
+
+ private static final String dA = "d0";
+ private static final String dB = "d1";
+
+ /**
+ * The following three fields are insert configuration parameters. The
double write feature
+ * already applies to all write interfaces, so you are free to modify these
parameters.
+ */
+ // Total insertion requests during test
+ private static final int batchCnt = 100000;
+ // The insertion timeseries count per timestamp
+ private static final int timeseriesCnt = 100;
+ // The insertion rows count per request
+ private static final int batchSize = 1;
+
+ public static void main(String[] args) throws Exception {
+ initEnvironment();
+ initSessionPool(dA, dB, batchCnt, timeseriesCnt, batchSize);
+ insertData();
+ cleanEnvironment();
+ }
+
+ private static void insertData() throws InterruptedException {
+ // do insertion while measuring performance
+ long startTime = System.currentTimeMillis();
+ threadA.start();
+ threadA.join();
+ double doubleWriteCost = System.currentTimeMillis() - startTime;
+
+ startTime = System.currentTimeMillis();
+ threadB.start();
+ threadB.join();
+ double normalWriteCost = System.currentTimeMillis() - startTime;
+
+ // compute performance
+ double total = batchCnt * batchSize;
+ System.out.println("Normal write cost: " + normalWriteCost / 1000.0 + "s");
+ System.out.println("Average: " + normalWriteCost / total + " ms per
insertion");
+ System.out.println("Double write cost: " + doubleWriteCost / 1000.0 + "s");
+ System.out.println("Average: " + doubleWriteCost / total + " ms per
insertion");
+ System.out.println(
+ "Performance degradation rate : "
+ + (doubleWriteCost - normalWriteCost) / normalWriteCost * 100.0
+ + "%");
+ }
+}
diff --git
a/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteDurability.java
b/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteDurability.java
new file mode 100644
index 0000000000..5cfafb4a83
--- /dev/null
+++
b/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteDurability.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.doublewrite;
+
+import org.apache.iotdb.session.pool.SessionPool;
+
+public class DoubleWriteDurability extends DoubleWriteUtil {
+
+ private static final String dA = "d0";
+ private static final String dB = "d1";
+
+ private static final int batchCnt = 3000;
+ private static final int timeseriesCnt = 1000;
+ private static final int batchSize = 1;
+
+ public static void main(String[] args) throws Exception {
+ initEnvironment();
+
+ sessionPoolA = new SessionPool(ipA, portA, userA, passwordA, concurrency);
+ // Create StorageGroups
+ try {
+ sessionPoolA.deleteStorageGroup(sg);
+ } catch (Exception ignored) {
+ // ignored
+ }
+ sessionPoolA.setStorageGroup(sg);
+
+ // Create double write threads
+ DoubleWriteThread doubleWriteThreadA =
+ new DoubleWriteThread(sessionPoolA, dA, batchCnt, timeseriesCnt,
batchSize);
+ threadA = new Thread(doubleWriteThreadA);
+ DoubleWriteThread doubleWriteThreadB =
+ new DoubleWriteThread(sessionPoolA, dB, batchCnt, timeseriesCnt,
batchSize);
+ threadB = new Thread(doubleWriteThreadB);
+
+ insertData();
+ }
+
+ private static void insertData() throws InterruptedException {
+ threadA.start();
+ threadB.start();
+
+ threadA.join();
+ threadB.join();
+
+ System.out.println("Insertion complete.");
+ }
+}
diff --git
a/session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
b/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteEmpty.java
similarity index 62%
copy from
session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
copy to
example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteEmpty.java
index 202e60b89b..73ff3a2fee 100644
--- a/session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
+++
b/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteEmpty.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -16,20 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.iotdb.doublewrite;
-package org.apache.iotdb.session.util;
+public class DoubleWriteEmpty extends DoubleWriteUtil {
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.concurrent.ThreadFactory;
-
-public class ThreadUtilsTest {
-
- @Test
- public void createThreadFactory() {
- ThreadFactory daemonThreadFactory =
ThreadUtils.createThreadFactory("Test", true);
- Thread thread = daemonThreadFactory.newThread(() -> {});
- Assert.assertEquals("Test-0", thread.getName());
+ public static void main(String[] args) {
+ initEnvironment();
}
}
diff --git
a/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteExample.java
b/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteExample.java
new file mode 100644
index 0000000000..fc363b1015
--- /dev/null
+++
b/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteExample.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.doublewrite;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.pool.SessionDataSetWrapper;
+
+/**
+ * This is a simple double write example java class, which shows how to enable
double write feature
+ * and proves that the double write feature performs correctly. To run this
code, you need to start
+ * another IoTDB service. The easiest way to do this is to copy the
+ * ./server/target/iotdb-server-0.12.5-SNAPSHOT folder to any location. Then
change the default port
+ * in iotdb-engine.properties to 6668, and start the service using the
start-server script. Or you
+ * can start IoTDB-B on another computer and modify the configuration of
IoTDB-B in DoubleWriteUtil.
+ * Finally, you can run this code and see double write feature from the
command line.
+ */
+public class DoubleWriteExample extends DoubleWriteUtil {
+
+ private static final String dA = "d0";
+ private static final String dB = "d0";
+
+ private static final int batchCnt = 10;
+ private static final int timeseriesCnt = 1;
+ private static final int batchSize = 1;
+
+ private static final String sql = "select * from root.DOUBLEWRITESG.d0";
+
+ public static void main(String[] args) throws Exception {
+ initEnvironment();
+ initSessionPool(dA, dB, batchCnt, timeseriesCnt, batchSize);
+ insertData();
+ queryData();
+ cleanEnvironment();
+ }
+
+ private static void insertData() throws InterruptedException {
+ threadA.start();
+ threadB.start();
+
+ threadA.join();
+ threadB.join();
+ }
+
+ private static void queryData() throws IoTDBConnectionException,
StatementExecutionException {
+ System.out.println("Data in IoTDB-A:");
+ // select data from IoTDB-A
+ SessionDataSetWrapper wrapper = sessionPoolA.executeQueryStatement(sql);
+ while (wrapper.hasNext()) {
+ System.out.println(wrapper.next());
+ }
+
+ // select data from IoTDB-B
+ System.out.println("Data in IoTDB-B:");
+ wrapper = sessionPoolB.executeQueryStatement(sql);
+ while (wrapper.hasNext()) {
+ System.out.println(wrapper.next());
+ }
+ }
+}
diff --git
a/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteThread.java
b/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteThread.java
new file mode 100644
index 0000000000..9e9a2a49ed
--- /dev/null
+++
b/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteThread.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.doublewrite;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Thread for insertion. Normally you don't need to modify this java class. */
+public class DoubleWriteThread implements Runnable {
+
+ private final SessionPool sessionPool;
+
+ private final String sg = "root.DOUBLEWRITESG";
+ private final String deviceId;
+
+ private final int batchCnt;
+ private final int timeseriesCnt;
+ private final int batchSize;
+
+ DoubleWriteThread(
+ SessionPool sessionPool, String deviceId, int batchCnt, int
timeseriesCnt, int batchSize)
+ throws IoTDBConnectionException, StatementExecutionException {
+ this.sessionPool = sessionPool;
+
+ this.deviceId = deviceId;
+
+ this.batchCnt = batchCnt;
+ this.timeseriesCnt = timeseriesCnt;
+ this.batchSize = batchSize;
+
+ for (int i = 0; i < timeseriesCnt; i++) {
+ sessionPool.createTimeseries(
+ sg + "." + deviceId + "." + "s" + i,
+ TSDataType.INT32,
+ TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED);
+ }
+ }
+
+ @Override
+ public void run() {
+ long timestamp = 0;
+ for (int i = 0; i < batchCnt; i++) {
+ List<String> deviceList = new ArrayList<>();
+ List<Long> timestampList = new ArrayList<>();
+ List<List<String>> measurementList = new ArrayList<>();
+ List<List<String>> valueList = new ArrayList<>();
+
+ for (int j = 0; j < batchSize; j++) {
+ deviceList.add(sg + "." + deviceId);
+ timestampList.add(timestamp);
+ List<String> measurements = new ArrayList<>();
+ List<String> values = new ArrayList<>();
+ for (int k = 0; k < timeseriesCnt; k++) {
+ measurements.add("s" + k);
+ values.add(String.valueOf(timestamp));
+ }
+ measurementList.add(measurements);
+ valueList.add(values);
+ timestamp += 1;
+ }
+
+ try {
+ if (batchSize == 1) {
+ sessionPool.insertRecord(
+ deviceList.get(0), timestampList.get(0), measurementList.get(0),
valueList.get(0));
+ } else {
+ sessionPool.insertRecords(deviceList, timestampList,
measurementList, valueList);
+ }
+ } catch (IoTDBConnectionException | StatementExecutionException ignored)
{
+ // ignored
+ }
+ }
+ }
+}
diff --git
a/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteUtil.java
b/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteUtil.java
new file mode 100644
index 0000000000..be65248a32
--- /dev/null
+++
b/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteUtil.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.doublewrite;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.pool.SessionPool;
+
+/**
+ * This java class is used to create the double write examples environment.
You can set IoTDB-B
+ * config here
+ */
+public abstract class DoubleWriteUtil {
+
+ // IoTDB-A config
+ // Started by EnvironmentUtils, shouldn't be modified
+ protected static final String ipA = "127.0.0.1";
+ protected static final int portA = 6667;
+ protected static final String userA = "root";
+ protected static final String passwordA = "root";
+
+ // IoTDB-B config
+ // You can modify that config in order to connect with IoTDB-B you started
+ protected static final String ipB = "127.0.0.1";
+ protected static final int portB = 6668;
+ protected static final String userB = "root";
+ protected static final String passwordB = "root";
+
+ protected static SessionPool sessionPoolA;
+ protected static SessionPool sessionPoolB;
+ // The sessionPool concurrency
+ protected static final int concurrency = 5;
+
+ // Default name of StorageGroup
+ protected static final String sg = "root.DOUBLEWRITESG";
+
+ // Threads for double write
+ protected static Thread threadA;
+ protected static Thread threadB;
+
+ protected static void initEnvironment() {
+ // Start local IoTDB-A on ip "127.0.0.1", port 6667 and set
enableDoubleWrite
+ IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ config.setEnableDoubleWrite(true);
+ config.setSecondaryAddress(ipB);
+ config.setSecondaryPort(portB);
+ config.setSecondaryUser(userB);
+ config.setSecondaryPassword(passwordB);
+ config.setDoubleWriteMaxLogSize(1024);
+
+ EnvironmentUtils.envSetUp();
+ }
+
+ protected static void initSessionPool(
+ String dA, String dB, int batchCnt, int timeseriesCnt, int batchSize)
+ throws IoTDBConnectionException, StatementExecutionException {
+ // Create sessionPools
+ sessionPoolA = new SessionPool(ipA, portA, userA, passwordA, concurrency);
+ sessionPoolB = new SessionPool(ipB, portB, userB, passwordB, concurrency);
+
+ // Create StorageGroups
+ try {
+ sessionPoolA.deleteStorageGroup(sg);
+ } catch (Exception ignored) {
+ // ignored
+ }
+ try {
+ sessionPoolB.deleteStorageGroup(sg);
+ } catch (Exception ignored) {
+ // ignored
+ }
+ sessionPoolA.setStorageGroup(sg);
+ sessionPoolB.setStorageGroup(sg);
+
+ // Create double write threads
+ DoubleWriteThread doubleWriteThreadA =
+ new DoubleWriteThread(sessionPoolA, dA, batchCnt, timeseriesCnt,
batchSize);
+ threadA = new Thread(doubleWriteThreadA);
+ DoubleWriteThread doubleWriteThreadB =
+ new DoubleWriteThread(sessionPoolB, dB, batchCnt, timeseriesCnt,
batchSize);
+ threadB = new Thread(doubleWriteThreadB);
+ }
+
+ protected static void cleanEnvironment() throws Exception {
+ // Clean StorageGroups, close sessionPools and shut down environment
+ sessionPoolA.deleteStorageGroup("root.DOUBLEWRITESG");
+ sessionPoolB.deleteStorageGroup("root.DOUBLEWRITESG");
+
+ sessionPoolA.close();
+ sessionPoolB.close();
+
+ EnvironmentUtils.cleanEnv();
+ EnvironmentUtils.shutdownDaemon();
+ }
+}
diff --git a/example/pom.xml b/example/pom.xml
index e099ceeb1f..aced1b0801 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -47,6 +47,7 @@
<module>udf</module>
<module>trigger</module>
<module>rabbitmq</module>
+ <module>doublewrite</module>
</modules>
<build>
<pluginManagement>
diff --git
a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
index 230849d25d..23a1895c22 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
@@ -32,12 +32,12 @@ import java.util.concurrent.Executors;
public class SessionPoolExample {
- private static SessionPool pool;
+ private static SessionPool sessionPool;
private static ExecutorService service;
- public static void main(String[] args)
- throws StatementExecutionException, IoTDBConnectionException,
InterruptedException {
- pool =
+ /** Build a custom SessionPool for this example */
+ private static void constructCustomSessionPool() {
+ sessionPool =
new SessionPool.Builder()
.host("127.0.0.1")
.port(6667)
@@ -45,13 +45,33 @@ public class SessionPoolExample {
.password("root")
.maxSize(3)
.build();
- service = Executors.newFixedThreadPool(10);
+ }
+ /** Build a redirect-able SessionPool for this example */
+ private static void constructRedirectSessionPool() {
+ List<String> nodeUrls = new ArrayList<>();
+ nodeUrls.add("127.0.0.1:6667");
+ nodeUrls.add("127.0.0.1:6668");
+ sessionPool =
+ new SessionPool.Builder()
+ .nodeUrls(nodeUrls)
+ .user("root")
+ .password("root")
+ .maxSize(3)
+ .build();
+ }
+
+ public static void main(String[] args)
+ throws StatementExecutionException, IoTDBConnectionException,
InterruptedException {
+ // Choose the SessionPool you going to use
+ constructRedirectSessionPool();
+
+ service = Executors.newFixedThreadPool(10);
insertRecord();
queryByRowRecord();
Thread.sleep(1000);
queryByIterator();
- pool.close();
+ sessionPool.close();
service.shutdown();
}
@@ -72,7 +92,7 @@ public class SessionPoolExample {
values.add(1L);
values.add(2L);
values.add(3L);
- pool.insertRecord(deviceId, time, measurements, types, values);
+ sessionPool.insertRecord(deviceId, time, measurements, types, values);
}
}
@@ -82,7 +102,7 @@ public class SessionPoolExample {
() -> {
SessionDataSetWrapper wrapper = null;
try {
- wrapper = pool.executeQueryStatement("select * from
root.sg1.d1");
+ wrapper = sessionPool.executeQueryStatement("select * from
root.sg1.d1");
System.out.println(wrapper.getColumnNames());
System.out.println(wrapper.getColumnTypes());
while (wrapper.hasNext()) {
@@ -92,7 +112,7 @@ public class SessionPoolExample {
e.printStackTrace();
} finally {
// remember to close data set finally!
- pool.closeResultSet(wrapper);
+ sessionPool.closeResultSet(wrapper);
}
});
}
@@ -104,7 +124,7 @@ public class SessionPoolExample {
() -> {
SessionDataSetWrapper wrapper = null;
try {
- wrapper = pool.executeQueryStatement("select * from
root.sg1.d1");
+ wrapper = sessionPool.executeQueryStatement("select * from
root.sg1.d1");
// get DataIterator like JDBC
DataIterator dataIterator = wrapper.iterator();
System.out.println(wrapper.getColumnNames());
@@ -120,7 +140,7 @@ public class SessionPoolExample {
e.printStackTrace();
} finally {
// remember to close data set finally!
- pool.closeResultSet(wrapper);
+ sessionPool.closeResultSet(wrapper);
}
});
}
diff --git a/integration/pom.xml b/integration/pom.xml
index f014163b7c..4ddd7e8c66 100644
--- a/integration/pom.xml
+++ b/integration/pom.xml
@@ -34,6 +34,13 @@
<artifactId>iotdb-server</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-server</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-cluster</artifactId>
diff --git
a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionDisableMemControlIT.java
b/integration/src/test/java/org/apache/iotdb/session/IoTDBSessionDisableMemControlIT.java
similarity index 100%
rename from
session/src/test/java/org/apache/iotdb/session/IoTDBSessionDisableMemControlIT.java
rename to
integration/src/test/java/org/apache/iotdb/session/IoTDBSessionDisableMemControlIT.java
diff --git
a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorABDeviceIT.java
b/integration/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorABDeviceIT.java
similarity index 100%
rename from
session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorABDeviceIT.java
rename to
integration/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorABDeviceIT.java
diff --git
a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
b/integration/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
similarity index 100%
rename from
session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
rename to
integration/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
diff --git
a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationWithUnSeqIT.java
b/integration/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationWithUnSeqIT.java
similarity index 100%
rename from
session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationWithUnSeqIT.java
rename to
integration/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationWithUnSeqIT.java
diff --git
a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
b/integration/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
similarity index 100%
rename from
session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
rename to
integration/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
diff --git
a/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
b/integration/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
similarity index 100%
rename from
session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
rename to
integration/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
diff --git a/session/src/test/java/org/apache/iotdb/session/SessionTest.java
b/integration/src/test/java/org/apache/iotdb/session/SessionTest.java
similarity index 100%
rename from session/src/test/java/org/apache/iotdb/session/SessionTest.java
rename to integration/src/test/java/org/apache/iotdb/session/SessionTest.java
diff --git
a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
b/integration/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
similarity index 100%
rename from
session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
rename to
integration/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
diff --git
a/session/src/test/java/org/apache/iotdb/session/template/TemplateUT.java
b/integration/src/test/java/org/apache/iotdb/session/template/TemplateUT.java
similarity index 100%
rename from
session/src/test/java/org/apache/iotdb/session/template/TemplateUT.java
rename to
integration/src/test/java/org/apache/iotdb/session/template/TemplateUT.java
diff --git
a/session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
b/integration/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
similarity index 100%
rename from
session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
rename to
integration/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
diff --git a/server/pom.xml b/server/pom.xml
index 0cfec2bd3e..4d58bcdaf8 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -45,6 +45,11 @@
<artifactId>iotdb-antlr</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-session</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>tsfile</artifactId>
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties
b/server/src/assembly/resources/conf/iotdb-engine.properties
index a4a02484ee..ae448c09b4 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -926,3 +926,60 @@ timestamp_precision=ms
####################
# Datatype: float
# group_by_fill_cache_size_in_mb=1.0
+
+####################
+### Double Write Configuration
+####################
+
+# Is DoubleWrite enable
+# Datatype: boolean
+# enable_double_write=false
+
+# Secondary IoTDB config
+# Datatype: String
+# secondary_address=127.0.0.1
+
+# Datatype: int
+# secondary_port=6668
+
+# Datatype: String
+# secondary_user=root
+
+# Datatype: String
+# secondary_password=root
+
+# The transmitting concurrency size of DoubleWrite SessionPool
+# This parameter should better not exceed the sum of concurrent
DoubleWriteTask or DoubleWriteConsumer and concurrent DoubleWriteProtector
+# Datatype: int
+# double_write_session_concurrency_size = 8
+
+# DoubleWriteLog dir
+# If this property is unset, DoubleWrite module will save the DoubleWriteLogs
in the default relative path directory under the IoTDB folder(i.e.,
%IOTDB_HOME%/data/doublewrite).
+# If it is absolute, DoubleWrite module will save the data in exact location
it points to.
+# If it is relative, DoubleWrite module will save the data in the relative
path directory it indicates under the IoTDB folder.
+# For windows platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is
"\\\\", then the path is absolute. Otherwise, it is relative.
+# double_write_log_dir=data\\doublewrite
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+# double_write_log_dir=data/doublewrite
+
+# The validity of each DoubleWriteLog
+# Datatype: int
+# double_write_log_file_validity=30
+
+# The maximum id of DoubleWriteLog
+# Datatype: int
+# double_write_log_file_num = 32767
+
+# The max size of all the DoubleWriteLog. Default is 100GB
+# Datatype: long
+# double_write_max_log_size = 107374182400
+
+# DoubleWriteProducer cache size
+# Datatype: int
+# double_write_producer_cache_size=1024
+
+# DoubleWriteConsumer concurrency size
+# Datatype: int
+# double_write_consumer_concurrency_size=4
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index fb195d3542..654bda0be1 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -814,6 +814,33 @@ public class IoTDBConfig {
/** Encryption provided class parameter */
private String encryptDecryptProviderParameter;
+ // DoubleWrite Config
+ private boolean enableDoubleWrite = false;
+
+ // Secondary IoTDB
+ private String secondaryAddress = "127.0.0.1";
+ private int secondaryPort = 6668;
+ private String secondaryUser = "root";
+ private String secondaryPassword = "root";
+
+ // The transmitting concurrency size of double write SessionPool
+ private int doubleWriteSessionConcurrencySize = 8;
+
+ // DoubleWriteLog dir
+ private String doubleWriteLogDir =
+ DEFAULT_BASE_DIR + File.separator +
IoTDBConstant.DOUBLEWRITE_FOLDER_NAME;
+ // The validity of each DoubleWriteLog
+ private int doubleWriteLogValidity = 30;
+ // The maximum id of DoubleWriteLog
+ private int doubleWriteLogNum = 32767;
+ // The max size of all the DoubleWriteLog. Default is 100GB
+ private long doubleWriteMaxLogSize = 107374182400L;
+
+ // DoubleWrite InsertPlan cache size
+ private int doubleWriteProducerCacheSize = 1024;
+ // DoubleWriteConsumer concurrency size
+ private int doubleWriteConsumerConcurrencySize = 4;
+
public IoTDBConfig() {
// empty constructor
}
@@ -947,6 +974,7 @@ public class IoTDBConfig {
extDir = addHomeDir(extDir);
udfDir = addHomeDir(udfDir);
triggerDir = addHomeDir(triggerDir);
+ doubleWriteLogDir = addHomeDir(doubleWriteLogDir);
if
(TSFileDescriptor.getInstance().getConfig().getTSFileStorageFs().equals(FSType.HDFS))
{
String hdfsDir = getHdfsDir();
@@ -2551,4 +2579,100 @@ public class IoTDBConfig {
public void setEncryptDecryptProviderParameter(String
encryptDecryptProviderParameter) {
this.encryptDecryptProviderParameter = encryptDecryptProviderParameter;
}
+
+ public boolean isEnableDoubleWrite() {
+ return enableDoubleWrite;
+ }
+
+ public void setEnableDoubleWrite(boolean enableDoubleWrite) {
+ this.enableDoubleWrite = enableDoubleWrite;
+ }
+
+ public String getSecondaryAddress() {
+ return secondaryAddress;
+ }
+
+ public void setSecondaryAddress(String secondaryAddress) {
+ this.secondaryAddress = secondaryAddress;
+ }
+
+ public int getSecondaryPort() {
+ return secondaryPort;
+ }
+
+ public void setSecondaryPort(int secondaryPort) {
+ this.secondaryPort = secondaryPort;
+ }
+
+ public String getSecondaryUser() {
+ return secondaryUser;
+ }
+
+ public void setSecondaryUser(String secondaryUser) {
+ this.secondaryUser = secondaryUser;
+ }
+
+ public String getSecondaryPassword() {
+ return secondaryPassword;
+ }
+
+ public void setSecondaryPassword(String secondaryPassword) {
+ this.secondaryPassword = secondaryPassword;
+ }
+
+ public int getDoubleWriteSessionConcurrencySize() {
+ return doubleWriteSessionConcurrencySize;
+ }
+
+ public void setDoubleWriteSessionConcurrencySize(int
doubleWriteSessionConcurrencySize) {
+ this.doubleWriteSessionConcurrencySize = doubleWriteSessionConcurrencySize;
+ }
+
+ public String getDoubleWriteLogDir() {
+ return doubleWriteLogDir;
+ }
+
+ public void setDoubleWriteLogDir(String doubleWriteLogDir) {
+ this.doubleWriteLogDir = doubleWriteLogDir;
+ }
+
+ public int getDoubleWriteLogValidity() {
+ return doubleWriteLogValidity;
+ }
+
+ public void setDoubleWriteLogValidity(int doubleWriteLogValidity) {
+ this.doubleWriteLogValidity = doubleWriteLogValidity;
+ }
+
+ public int getDoubleWriteLogNum() {
+ return doubleWriteLogNum;
+ }
+
+ public void setDoubleWriteLogNum(int doubleWriteLogNum) {
+ this.doubleWriteLogNum = doubleWriteLogNum;
+ }
+
+ public long getDoubleWriteMaxLogSize() {
+ return doubleWriteMaxLogSize;
+ }
+
+ public void setDoubleWriteMaxLogSize(long doubleWriteMaxLogSize) {
+ this.doubleWriteMaxLogSize = doubleWriteMaxLogSize;
+ }
+
+ public int getDoubleWriteProducerCacheSize() {
+ return doubleWriteProducerCacheSize;
+ }
+
+ public void setDoubleWriteProducerCacheSize(int
doubleWriteProducerCacheSize) {
+ this.doubleWriteProducerCacheSize = doubleWriteProducerCacheSize;
+ }
+
+ public int getDoubleWriteConsumerConcurrencySize() {
+ return doubleWriteConsumerConcurrencySize;
+ }
+
+ public void setDoubleWriteConsumerConcurrencySize(int
doubleWriteConsumerConcurrencySize) {
+ this.doubleWriteConsumerConcurrencySize =
doubleWriteConsumerConcurrencySize;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index feab1abd37..848b35e4d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -154,6 +154,9 @@ public class IoTDBConstant {
public static final String UDF_FOLDER_NAME = "udf";
public static final String TRIGGER_FOLDER_NAME = "trigger";
+ // doublewrite folder name
+ public static final String DOUBLEWRITE_FOLDER_NAME = "doublewrite";
+
// mqtt
public static final String ENABLE_MQTT = "enable_mqtt_service";
public static final String MQTT_HOST_NAME = "mqtt_host";
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 3167a54f7e..40f9a9bc3c 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -783,6 +783,61 @@ public class IoTDBDescriptor {
"iotdb_server_encrypt_decrypt_provider_parameter",
conf.getEncryptDecryptProviderParameter()));
+ // set DoubleWrite config
+ conf.setEnableDoubleWrite(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_double_write",
String.valueOf(conf.isEnableDoubleWrite()))));
+
+ conf.setSecondaryAddress(
+ properties.getProperty("secondary_address",
conf.getSecondaryAddress()));
+
+ conf.setSecondaryPort(
+ Integer.parseInt(
+ properties.getProperty("secondary_port",
String.valueOf(conf.getSecondaryPort()))));
+
+ conf.setSecondaryUser(properties.getProperty("secondary_user",
conf.getSecondaryUser()));
+
+ conf.setSecondaryPassword(
+ properties.getProperty("secondary_password",
conf.getSecondaryPassword()));
+
+ conf.setDoubleWriteSessionConcurrencySize(
+ Integer.parseInt(
+ properties.getProperty(
+ "double_write_session_concurrency_size",
+
String.valueOf(conf.getDoubleWriteSessionConcurrencySize()))));
+
+ conf.setDoubleWriteLogDir(
+ properties.getProperty("double_write_log_dir",
conf.getDoubleWriteLogDir()));
+
+ conf.setDoubleWriteLogValidity(
+ Integer.parseInt(
+ properties.getProperty(
+ "double_write_log_file_validity",
+ String.valueOf(conf.getDoubleWriteLogValidity()))));
+
+ conf.setDoubleWriteLogNum(
+ Integer.parseInt(
+ properties.getProperty(
+ "double_write_log_file_num",
String.valueOf(conf.getDoubleWriteLogNum()))));
+
+ conf.setDoubleWriteMaxLogSize(
+ Long.parseLong(
+ properties.getProperty(
+ "double_write_max_log_size",
String.valueOf(conf.getDoubleWriteMaxLogSize()))));
+
+ conf.setDoubleWriteProducerCacheSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "double_write_producer_cache_size",
+ String.valueOf(conf.getDoubleWriteProducerCacheSize()))));
+
+ conf.setDoubleWriteConsumerConcurrencySize(
+ Integer.parseInt(
+ properties.getProperty(
+ "double_write_consumer_concurrency_size",
+
String.valueOf(conf.getDoubleWriteConsumerConcurrencySize()))));
+
// At the same time, set TSFileConfig
TSFileDescriptor.getInstance()
.getConfig()
diff --git
a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java
new file mode 100644
index 0000000000..2d0a3da959
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublewrite;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class DoubleWriteConsumer implements Runnable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DoubleWriteConsumer.class);
+
+ private final BlockingQueue<Pair<ByteBuffer,
DoubleWritePlanTypeUtils.DoubleWritePlanType>>
+ doubleWriteQueue;
+ private final SessionPool doubleWriteSessionPool;
+ private final DoubleWriteLogService niLogService;
+
+ public DoubleWriteConsumer(
+ BlockingQueue<Pair<ByteBuffer,
DoubleWritePlanTypeUtils.DoubleWritePlanType>>
+ doubleWriteQueue,
+ SessionPool doubleWriteSessionPool,
+ DoubleWriteLogService niLogService) {
+ this.doubleWriteQueue = doubleWriteQueue;
+ this.doubleWriteSessionPool = doubleWriteSessionPool;
+ this.niLogService = niLogService;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ Pair<ByteBuffer, DoubleWritePlanTypeUtils.DoubleWritePlanType> head;
+ ByteBuffer headBuffer;
+ DoubleWritePlanTypeUtils.DoubleWritePlanType headType;
+ try {
+ head = doubleWriteQueue.take();
+ headBuffer = head.left;
+ headType = head.right;
+ } catch (InterruptedException e) {
+ LOGGER.error("DoubleWriteConsumer been interrupted: ", e);
+ continue;
+ }
+
+ if (headType == DoubleWritePlanTypeUtils.DoubleWritePlanType.IPlan) {
+ try {
+ // Sleep 10ms when it's I-Plan
+ TimeUnit.MILLISECONDS.sleep(10);
+ } catch (InterruptedException ignore) {
+ // Ignored
+ }
+ }
+
+ headBuffer.position(0);
+ boolean transmitStatus = false;
+ try {
+ headBuffer.position(0);
+ transmitStatus =
doubleWriteSessionPool.doubleWriteTransmit(headBuffer);
+ } catch (IoTDBConnectionException connectionException) {
+ // warn IoTDBConnectionException and do serialization
+ LOGGER.warn(
+ "DoubleWriteConsumer can't transmit because network failure",
connectionException);
+ } catch (Exception e) {
+ // The PhysicalPlan has internal error, reject transmit
+ LOGGER.error("DoubleWriteConsumer can't transmit", e);
+ continue;
+ }
+
+ if (!transmitStatus) {
+ try {
+ // must set buffer position to limit() before serialization
+ headBuffer.position(headBuffer.limit());
+ niLogService.acquireLogWriter();
+ niLogService.write(headBuffer);
+ } catch (IOException e) {
+ LOGGER.error("DoubleWriteConsumer can't serialize physicalPlan", e);
+ }
+ niLogService.releaseLogWriter();
+ }
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteEProtector.java
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteEProtector.java
new file mode 100644
index 0000000000..81840d6eee
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteEProtector.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublewrite;
+
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.pool.SessionPool;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+public class DoubleWriteEProtector extends DoubleWriteProtector {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DoubleWriteEProtector.class);
+
+ private final SessionPool doubleWriteSessionPool;
+
+ public DoubleWriteEProtector(SessionPool doubleWriteSessionPool) {
+ super();
+ this.doubleWriteSessionPool = doubleWriteSessionPool;
+ }
+
+ @Override
+ protected void preCheck() {
+ // do nothing
+ }
+
+ @Override
+ protected void transmitPhysicalPlan(ByteBuffer planBuffer, PhysicalPlan
physicalPlan) {
+ while (true) {
+ // transmit E-Plan until it's been received
+ boolean transmitStatus = false;
+
+ try {
+ // try double write
+ planBuffer.position(0);
+ transmitStatus =
doubleWriteSessionPool.doubleWriteTransmit(planBuffer);
+ } catch (IoTDBConnectionException connectionException) {
+ // warn IoTDBConnectionException and retry
+ LOGGER.warn("DoubleWriteEProtector can't transmit, retrying...",
connectionException);
+ } catch (Exception e) {
+ // error exception and break
+ LOGGER.error("DoubleWriteEProtector can't transmit", e);
+ break;
+ }
+
+ if (transmitStatus) {
+ break;
+ } else {
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException e) {
+ LOGGER.warn("DoubleWriteEProtector is interrupted", e);
+ }
+ }
+ }
+ }
+
+ public boolean isAtWork() {
+ return isProtectorAtWork;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteLogService.java
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteLogService.java
new file mode 100644
index 0000000000..70df59f663
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteLogService.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublewrite;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.writelog.io.LogWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class DoubleWriteLogService implements Runnable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DoubleWriteLogService.class);
+
+ private static final String logFileDir =
+ IoTDBDescriptor.getInstance().getConfig().getDoubleWriteLogDir();
+ private static final long logFileValidity =
+ IoTDBDescriptor.getInstance().getConfig().getDoubleWriteLogValidity() *
1000L;
+ private static final int maxLogFileNum =
+ IoTDBDescriptor.getInstance().getConfig().getDoubleWriteLogNum();
+ private static final long maxLogFileSize =
+ IoTDBDescriptor.getInstance().getConfig().getDoubleWriteMaxLogSize();
+
+ private static long currentLogFileSize = 0;
+
+ private final DoubleWriteProtector protector;
+ private final Lock logWriterLock;
+ private final String logFileName;
+ private int logFileID;
+ private long logFileCreateTime;
+ private File logFile;
+ private LogWriter logWriter;
+
+ public DoubleWriteLogService(String logFileName, DoubleWriteProtector
protector) {
+ this.logFileName = logFileName;
+ this.protector = protector;
+
+ this.logWriterLock = new ReentrantLock();
+ this.logFile = null;
+ this.logWriter = null;
+
+ File logDir = new File(logFileDir);
+ if (!logDir.exists()) {
+ if (!logDir.mkdirs()) {
+ LOGGER.error("Can't make DoubleWriteLog file dir: {}",
logDir.getAbsolutePath());
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ // Check if there exists remnant logs
+ List<Integer> logFileIDList = new ArrayList<>();
+ for (int ID = 0; ID < maxLogFileNum; ID++) {
+ File file =
+ SystemFileFactory.INSTANCE.getFile(logFileDir + File.separator +
logFileName + ID);
+ if (file.exists()) {
+ logFileIDList.add(ID);
+ }
+ }
+
+ int firstID = 0;
+ if (logFileIDList.size() > 0) {
+ // Re-transmit the remnant logs
+ for (int i = 0; i < logFileIDList.size() - 1; i++) {
+ if (logFileIDList.get(i + 1) - logFileIDList.get(i) > 1) {
+ firstID = i + 1;
+ break;
+ }
+ }
+
+ for (int i = firstID; i < logFileIDList.size(); i++) {
+ protector.registerLogFile(logFileDir + File.separator + logFileName +
logFileIDList.get(i));
+ }
+ for (int i = 0; i < firstID; i++) {
+ protector.registerLogFile(logFileDir + File.separator + logFileName +
logFileIDList.get(i));
+ }
+
+ int nextID;
+ if (firstID == 0) {
+ nextID = logFileIDList.get(logFileIDList.size() - 1) + 1;
+ } else {
+ nextID = logFileIDList.get(firstID - 1) + 1;
+ }
+ logFileID = nextID % maxLogFileNum;
+ } else {
+ logFileID = 0;
+ }
+
+ while (true) {
+ // Check the validity of logFile
+ logWriterLock.lock();
+ if (logWriter != null && System.currentTimeMillis() - logFileCreateTime
> logFileValidity) {
+ // Submit logFile when it's expired
+ submitLogFile();
+ }
+ logWriterLock.unlock();
+
+ try {
+ // Sleep 10s before next check
+ TimeUnit.SECONDS.sleep(10);
+ } catch (InterruptedException e) {
+ LOGGER.error("DoubleWriteLogService been interrupted", e);
+ }
+ }
+ }
+
+ private void submitLogFile() {
+ try {
+ logWriter.force();
+ } catch (IOException e) {
+ LOGGER.error("Can't force logWrite", e);
+ }
+ incLogFileSize(logFile.length());
+
+ for (int retry = 0; retry < 5; retry++) {
+ try {
+ logWriter.close();
+ } catch (IOException e) {
+ LOGGER.warn("Can't close DoubleWriteLog: {}, retrying...",
logFile.getAbsolutePath());
+ try {
+ // Sleep 1s and retry
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException ignored) {
+ // Ignore and retry
+ }
+ continue;
+ }
+
+ LOGGER.info("DoubleWriteLog: {} is expired and closed",
logFile.getAbsolutePath());
+ break;
+ }
+
+ protector.registerLogFile(
+ logFileDir
+ + File.separator
+ + logFileName
+ + (logFileID - 1 + maxLogFileNum) % maxLogFileNum);
+
+ logWriter = null;
+ logFile = null;
+ }
+
+ private void createLogFile() {
+ logFile =
+ SystemFileFactory.INSTANCE.getFile(logFileDir + File.separator +
logFileName + logFileID);
+ while (true) {
+ try {
+ if (logFile.createNewFile()) {
+ logFileCreateTime = System.currentTimeMillis();
+ logWriter = new LogWriter(logFile, false);
+ LOGGER.info("Create DoubleWriteLog: {}", logFile.getAbsolutePath());
+ break;
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Can't create DoubleWriteLog: {}, retrying...",
logFile.getAbsolutePath());
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException ignored) {
+ // Ignore and retry
+ }
+ }
+ }
+ logFileID = (logFileID + 1) % maxLogFileNum;
+ }
+
+ public static synchronized void incLogFileSize(long size) {
+ currentLogFileSize = currentLogFileSize + size;
+ }
+
+ public void acquireLogWriter() {
+ logWriterLock.lock();
+ }
+
+ public void write(ByteBuffer buffer) throws IOException {
+ if (currentLogFileSize < maxLogFileSize) {
+ if (logWriter == null) {
+ // Create logFile when there are no valid
+ createLogFile();
+ }
+ logWriter.write(buffer);
+ }
+ }
+
+ public void releaseLogWriter() {
+ logWriterLock.unlock();
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteNIProtector.java
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteNIProtector.java
new file mode 100644
index 0000000000..c6c0fd09e8
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteNIProtector.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublewrite;
+
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+public class DoubleWriteNIProtector extends DoubleWriteProtector {
+
+ private final DoubleWriteEProtector eProtector;
+ private final DoubleWriteProducer producer;
+
+ public DoubleWriteNIProtector(DoubleWriteEProtector eProtector,
DoubleWriteProducer producer) {
+ super();
+ this.eProtector = eProtector;
+ this.producer = producer;
+ }
+
+ @Override
+ protected void preCheck() {
+ while (eProtector.isAtWork()) {
+ try {
+ TimeUnit.SECONDS.sleep(5);
+ } catch (InterruptedException ignore) {
+ // ignore and retry
+ }
+ }
+ }
+
+ @Override
+ protected void transmitPhysicalPlan(ByteBuffer planBuffer, PhysicalPlan
physicalPlan) {
+ producer.put(
+ new Pair<>(planBuffer,
DoubleWritePlanTypeUtils.getDoubleWritePlanType(physicalPlan)));
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWritePlanTypeUtils.java
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWritePlanTypeUtils.java
new file mode 100644
index 0000000000..3573d5f973
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWritePlanTypeUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublewrite;
+
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+
+public class DoubleWritePlanTypeUtils {
+
+ public static DoubleWritePlanType getDoubleWritePlanType(PhysicalPlan plan) {
+ if (plan instanceof SetStorageGroupPlan
+ || plan instanceof DeleteStorageGroupPlan
+ || plan instanceof CreateTimeSeriesPlan
+ || plan instanceof CreateMultiTimeSeriesPlan
+ || plan instanceof CreateAlignedTimeSeriesPlan
+ || plan instanceof DeleteTimeSeriesPlan) {
+ return DoubleWritePlanType.EPlan;
+ } else if (plan instanceof DeletePlan) {
+ return DoubleWritePlanType.IPlan;
+ } else if (plan instanceof InsertPlan) {
+ return DoubleWritePlanType.NPlan;
+ }
+ return null;
+ }
+
+ public enum DoubleWritePlanType {
+ EPlan,
+ IPlan,
+ NPlan
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java
new file mode 100644
index 0000000000..e024487ba2
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublewrite;
+
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * DoubleWriteProducer using BlockingQueue to cache PhysicalPlan. And persist
some PhysicalPlan when
+ * they are too many to transmit
+ */
+public class DoubleWriteProducer {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DoubleWriteProducer.class);
+
+ private final BlockingQueue<Pair<ByteBuffer,
DoubleWritePlanTypeUtils.DoubleWritePlanType>>
+ doubleWriteQueue;
+
+ public DoubleWriteProducer(
+ BlockingQueue<Pair<ByteBuffer,
DoubleWritePlanTypeUtils.DoubleWritePlanType>>
+ doubleWriteQueue) {
+ this.doubleWriteQueue = doubleWriteQueue;
+ }
+
+ public void put(Pair<ByteBuffer,
DoubleWritePlanTypeUtils.DoubleWritePlanType> planPair) {
+ try {
+ planPair.left.position(0);
+ doubleWriteQueue.put(planPair);
+ } catch (InterruptedException e) {
+ LOGGER.error("double write cache failed.", e);
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProtector.java
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProtector.java
new file mode 100644
index 0000000000..6eb00355c4
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProtector.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublewrite;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.writelog.io.SingleFileLogReader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public abstract class DoubleWriteProtector implements Runnable {
+
+ protected static final Logger LOGGER =
LoggerFactory.getLogger(DoubleWriteProtector.class);
+ protected static final int logFileValidity =
+ IoTDBDescriptor.getInstance().getConfig().getDoubleWriteLogValidity();
+
+ // For transmit log files
+ protected final Lock logFileListLock;
+ protected List<String> registeredLogFiles;
+ protected List<String> processingLogFiles;
+
+ // For serialize PhysicalPlan
+ private static final int MAX_PHYSICALPLAN_SIZE = 16 * 1024 * 1024;
+ protected final ByteArrayOutputStream protectorByteStream;
+ protected final DataOutputStream protectorDeserializeStream;
+
+ // Working state
+ protected volatile boolean isProtectorAtWork;
+
+ protected DoubleWriteProtector() {
+ logFileListLock = new ReentrantLock();
+ registeredLogFiles = new ArrayList<>();
+
+ protectorByteStream = new ByteArrayOutputStream(MAX_PHYSICALPLAN_SIZE);
+ protectorDeserializeStream = new DataOutputStream(protectorByteStream);
+
+ isProtectorAtWork = false;
+ }
+
+ protected void registerLogFile(String logFile) {
+ logFileListLock.lock();
+ registeredLogFiles.add(logFile);
+ logFileListLock.unlock();
+ }
+
+ protected void wrapLogFiles() {
+ processingLogFiles = new ArrayList<>(registeredLogFiles);
+ registeredLogFiles = new ArrayList<>();
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ while (true) {
+ // Wrap and transmit all DoubleWriteLogs
+ logFileListLock.lock();
+ if (registeredLogFiles.size() > 0) {
+ isProtectorAtWork = true;
+ wrapLogFiles();
+ logFileListLock.unlock();
+ } else {
+ isProtectorAtWork = false;
+ logFileListLock.unlock();
+ break;
+ }
+ if (isProtectorAtWork) {
+ transmitLogFiles();
+ }
+ }
+
+ try {
+ // Sleep a while before next check
+ TimeUnit.SECONDS.sleep(logFileValidity);
+ } catch (InterruptedException e) {
+ LOGGER.warn("DoubleWriteProtector been interrupted", e);
+ }
+ }
+ }
+
+ protected void transmitLogFiles() {
+ preCheck();
+ for (String logFileName : processingLogFiles) {
+ File logFile = SystemFileFactory.INSTANCE.getFile(logFileName);
+ SingleFileLogReader logReader;
+ try {
+ logReader = new SingleFileLogReader(logFile);
+ } catch (FileNotFoundException e) {
+ LOGGER.error(
+ "DoubleWriteProtector can't open DoubleWriteLog: {}, discarded",
+ logFile.getAbsolutePath(),
+ e);
+ continue;
+ }
+
+ while (logReader.hasNext()) {
+ // read and re-serialize the PhysicalPlan
+ PhysicalPlan nextPlan = logReader.next();
+ try {
+ nextPlan.serialize(protectorDeserializeStream);
+ } catch (IOException e) {
+ LOGGER.error("DoubleWriteProtector can't serialize PhysicalPlan", e);
+ continue;
+ }
+ ByteBuffer nextBuffer =
ByteBuffer.wrap(protectorByteStream.toByteArray());
+ protectorByteStream.reset();
+ transmitPhysicalPlan(nextBuffer, nextPlan);
+ }
+
+ logReader.close();
+ try {
+ // sleep one second then delete DoubleWriteLog
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException e) {
+ LOGGER.warn("DoubleWriteProtector is interrupted", e);
+ }
+
+ DoubleWriteLogService.incLogFileSize(-logFile.length());
+
+ boolean deleted = false;
+ for (int retryCnt = 0; retryCnt < 5; retryCnt++) {
+ if (logFile.delete()) {
+ deleted = true;
+ LOGGER.info("DoubleWriteLog: {} is deleted.",
logFile.getAbsolutePath());
+ break;
+ } else {
+ LOGGER.warn("Delete DoubleWriteLog: {} failed. Retrying",
logFile.getAbsolutePath());
+ }
+ }
+ if (!deleted) {
+ DoubleWriteLogService.incLogFileSize(logFile.length());
+ LOGGER.error("Couldn't delete DoubleWriteLog: {}",
logFile.getAbsolutePath());
+ }
+ }
+ }
+
+ protected abstract void preCheck();
+
+ protected abstract void transmitPhysicalPlan(ByteBuffer planBuffer,
PhysicalPlan physicalPlan);
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteTask.java
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteTask.java
new file mode 100644
index 0000000000..3bcef6bf58
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteTask.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublewrite;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.pool.SessionPool;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/** DoubleWriteTask is used for transmit one E-Plan sending by a client */
+public class DoubleWriteTask implements Runnable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DoubleWriteTask.class);
+
+ private final ByteBuffer physicalPlanBuffer;
+ private final SessionPool doubleWriteSessionPool;
+ private final DoubleWriteEProtector eProtector;
+ private final DoubleWriteLogService eLogService;
+
+ public DoubleWriteTask(
+ ByteBuffer physicalPlanBuffer,
+ SessionPool doubleWriteSessionPool,
+ DoubleWriteEProtector eProtector,
+ DoubleWriteLogService eLogService) {
+ this.physicalPlanBuffer = physicalPlanBuffer;
+ this.doubleWriteSessionPool = doubleWriteSessionPool;
+ this.eProtector = eProtector;
+ this.eLogService = eLogService;
+ }
+
+ @Override
+ public void run() {
+ if (eProtector.isAtWork()) {
+ serializeEPlan();
+ } else {
+ boolean transmitStatus = false;
+ try {
+ physicalPlanBuffer.position(0);
+ transmitStatus =
doubleWriteSessionPool.doubleWriteTransmit(physicalPlanBuffer);
+ } catch (IoTDBConnectionException connectionException) {
+ // warn IoTDBConnectionException and do serialization
+ LOGGER.warn("DoubleWriteTask can't transmit because network failure",
connectionException);
+ } catch (Exception e) {
+ // The PhysicalPlan has internal error, reject transmit
+ LOGGER.error("DoubleWriteTask can't transmit", e);
+ return;
+ }
+ if (!transmitStatus) {
+ serializeEPlan();
+ }
+ }
+ }
+
+ private void serializeEPlan() {
+ // serialize the E-Plan if necessary
+ try {
+ // must set buffer position to limit() before serialization
+ physicalPlanBuffer.position(physicalPlanBuffer.limit());
+ eLogService.acquireLogWriter();
+ eLogService.write(physicalPlanBuffer);
+ } catch (IOException e) {
+ LOGGER.error("can't serialize current PhysicalPlan", e);
+ }
+ eLogService.releaseLogWriter();
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 3f9dce62b3..80ffcbb14f 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -25,6 +25,13 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.doublewrite.DoubleWriteConsumer;
+import org.apache.iotdb.db.doublewrite.DoubleWriteEProtector;
+import org.apache.iotdb.db.doublewrite.DoubleWriteLogService;
+import org.apache.iotdb.db.doublewrite.DoubleWriteNIProtector;
+import org.apache.iotdb.db.doublewrite.DoubleWritePlanTypeUtils;
+import org.apache.iotdb.db.doublewrite.DoubleWriteProducer;
+import org.apache.iotdb.db.doublewrite.DoubleWriteTask;
import org.apache.iotdb.db.engine.selectinto.InsertTabletPlansIterator;
import org.apache.iotdb.db.exception.IoTDBException;
import org.apache.iotdb.db.exception.QueryInBatchStatementException;
@@ -106,6 +113,7 @@ import
org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInternalSyncWriteReq;
import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
@@ -120,6 +128,7 @@ import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.service.rpc.thrift.TSTracingInfo;
import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
+import org.apache.iotdb.session.pool.SessionPool;
import
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -127,11 +136,14 @@ import
org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.SQLException;
@@ -142,6 +154,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
@@ -302,9 +316,61 @@ public class TSServiceImpl implements TSIService.Iface {
protected final ServiceProvider serviceProvider;
+ /* Double write module */
+ private static final boolean isEnableDoubleWrite =
+ IoTDBDescriptor.getInstance().getConfig().isEnableDoubleWrite();
+ private final SessionPool doubleWriteSessionPool;
+ private final DoubleWriteProducer doubleWriteProducer;
+ private final DoubleWriteEProtector doubleWriteEProtector;
+ private final DoubleWriteLogService doubleWriteELogService;
+
public TSServiceImpl() {
super();
serviceProvider = IoTDB.serviceProvider;
+
+ if (isEnableDoubleWrite) {
+ /* Open double write */
+ IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ // create SessionPool for double write
+ doubleWriteSessionPool =
+ new SessionPool(
+ config.getSecondaryAddress(),
+ config.getSecondaryPort(),
+ config.getSecondaryUser(),
+ config.getSecondaryPassword(),
+ 5);
+
+ // create DoubleWriteEProtector and DoubleWriteELogService
+ doubleWriteEProtector = new
DoubleWriteEProtector(doubleWriteSessionPool);
+ new Thread(doubleWriteEProtector).start();
+ doubleWriteELogService = new DoubleWriteLogService("ELog",
doubleWriteEProtector);
+ new Thread(doubleWriteELogService).start();
+
+ // create DoubleWriteProducer
+ BlockingQueue<Pair<ByteBuffer,
DoubleWritePlanTypeUtils.DoubleWritePlanType>> blockingQueue =
+ new ArrayBlockingQueue<>(config.getDoubleWriteProducerCacheSize());
+ doubleWriteProducer = new DoubleWriteProducer(blockingQueue);
+
+ // create DoubleWriteNIProtector and DoubleWriteNILogService
+ DoubleWriteNIProtector doubleWriteNIProtector =
+ new DoubleWriteNIProtector(doubleWriteEProtector,
doubleWriteProducer);
+ new Thread(doubleWriteNIProtector).start();
+ DoubleWriteLogService doubleWriteNILogService =
+ new DoubleWriteLogService("NILog", doubleWriteNIProtector);
+ new Thread(doubleWriteNILogService).start();
+
+ // create DoubleWriteConsumer
+ for (int i = 0; i < config.getDoubleWriteConsumerConcurrencySize(); i++)
{
+ DoubleWriteConsumer consumer =
+ new DoubleWriteConsumer(blockingQueue, doubleWriteSessionPool,
doubleWriteNILogService);
+ new Thread(consumer).start();
+ }
+ } else {
+ doubleWriteSessionPool = null;
+ doubleWriteProducer = null;
+ doubleWriteEProtector = null;
+ doubleWriteELogService = null;
+ }
}
@Override
@@ -1484,7 +1550,12 @@ public class TSServiceImpl implements TSIService.Iface {
req.values,
req.isAligned);
TSStatus status = serviceProvider.checkAuthority(plan,
req.getSessionId());
- return status != null ? status : executeNonQueryPlan(plan);
+
+ if (status != null) {
+ return status;
+ }
+
+ return executeNonQueryPlan(plan);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.INSERT_RECORD,
e.getErrorCode());
} catch (Exception e) {
@@ -1515,7 +1586,12 @@ public class TSServiceImpl implements TSIService.Iface {
plan.setNeedInferType(true);
plan.setAligned(req.isAligned);
TSStatus status = serviceProvider.checkAuthority(plan,
req.getSessionId());
- return status != null ? status : executeNonQueryPlan(plan);
+
+ if (status != null) {
+ return status;
+ }
+
+ return executeNonQueryPlan(plan);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.INSERT_STRING_RECORD,
e.getErrorCode());
} catch (Exception e) {
@@ -1571,7 +1647,11 @@ public class TSServiceImpl implements TSIService.Iface {
insertTabletPlan.setAligned(req.isAligned);
TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
req.getSessionId());
- return status != null ? status : executeNonQueryPlan(insertTabletPlan);
+ if (status != null) {
+ return status;
+ }
+
+ return executeNonQueryPlan(insertTabletPlan);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.INSERT_TABLET,
e.getErrorCode());
} catch (Exception e) {
@@ -2057,7 +2137,86 @@ public class TSServiceImpl implements TSIService.Iface {
return status != null ? status : executeNonQueryPlan(plan);
}
+ @Override
+ public TSStatus executeDoubleWrite(TSInternalSyncWriteReq req) {
+ PhysicalPlan physicalPlan = null;
+ try {
+ ByteBuffer planBuffer = req.physicalPlan;
+ planBuffer.position(0);
+ physicalPlan = PhysicalPlan.Factory.create(req.physicalPlan);
+ } catch (IllegalPathException | IOException e) {
+ LOGGER.error("double write deserialization failed.", e);
+ }
+
+ DoubleWritePlanTypeUtils.DoubleWritePlanType planType =
+ DoubleWritePlanTypeUtils.getDoubleWritePlanType(physicalPlan);
+ if (planType == null) {
+ LOGGER.error(
+ "DoubleWrite receive unsupported PhysicalPlan type: {}",
physicalPlan.getOperatorName());
+ return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ }
+ // LOGGER.info("DoubleWrite receive:{}",
physicalPlan.getPaths().toString());
+
+ try {
+ return serviceProvider.executeNonQuery(physicalPlan)
+ ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute
successfully")
+ : RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } catch (Exception e) {
+ return onNonQueryException(e, OperationType.EXECUTE_NON_QUERY_PLAN);
+ }
+ }
+
+ private void transmitDoubleWrite(PhysicalPlan physicalPlan) {
+
+ DoubleWritePlanTypeUtils.DoubleWritePlanType planType =
+ DoubleWritePlanTypeUtils.getDoubleWritePlanType(physicalPlan);
+ if (planType == null) {
+ // Don't need DoubleWrite
+ return;
+ }
+
+ // LOGGER.info("DoubleWrite transmit: {}",
physicalPlan.getPaths().toString());
+
+ // serialize physical plan
+ ByteBuffer buffer;
+ try {
+ int size = physicalPlan.getSerializedSize();
+ ByteArrayOutputStream doubleWriteByteStream = new
ByteArrayOutputStream(size);
+ DataOutputStream doubleWriteSerializeStream = new
DataOutputStream(doubleWriteByteStream);
+ physicalPlan.serialize(doubleWriteSerializeStream);
+ buffer = ByteBuffer.wrap(doubleWriteByteStream.toByteArray());
+ } catch (IOException e) {
+ LOGGER.error("DoubleWrite can't serialize PhysicalPlan", e);
+ return;
+ }
+
+ switch (planType) {
+ case EPlan:
+ // Create DoubleWriteTask and wait
+ Thread taskThread =
+ new Thread(
+ new DoubleWriteTask(
+ buffer, doubleWriteSessionPool, doubleWriteEProtector,
doubleWriteELogService));
+ taskThread.start();
+ try {
+ taskThread.join();
+ } catch (InterruptedException e) {
+ LOGGER.error("DoubleWriteTask been interrupted", e);
+ }
+ break;
+ case IPlan:
+ case NPlan:
+ // Put into DoubleWriteProducer
+ doubleWriteProducer.put(new Pair<>(buffer, planType));
+ }
+ }
+
protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
+ if (isEnableDoubleWrite) {
+ // DoubleWrite should transmit before execute
+ transmitDoubleWrite(plan);
+ }
+
try {
return serviceProvider.executeNonQuery(plan)
? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute
successfully")
diff --git
a/server/src/test/java/org/apache/iotdb/db/doublewrite/DoubleWriteManualTestUtils.java
b/server/src/test/java/org/apache/iotdb/db/doublewrite/DoubleWriteManualTestUtils.java
new file mode 100644
index 0000000000..d7cb8d9b62
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/doublewrite/DoubleWriteManualTestUtils.java
@@ -0,0 +1,107 @@
+package org.apache.iotdb.db.doublewrite;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class DoubleWriteManualTestUtils {
+
+ private static final SessionPool sessionPool =
+ new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+
+ private static final String sg = "root.sg";
+ private static final int sgCnt = 10;
+ private static final String d = ".d";
+ private static final int dCnt = 20;
+ private static final String s = ".s";
+ private static final int sCnt = 100;
+ private static final int dataCnt = 1000;
+
+ public void setStorageGroups() throws IoTDBConnectionException,
StatementExecutionException {
+ for (int i = 0; i < sgCnt; i++) {
+ sessionPool.setStorageGroup(sg + i);
+ }
+ }
+
+ public void deleteStorageGroups() throws IoTDBConnectionException,
StatementExecutionException {
+ for (int i = 0; i < sgCnt; i++) {
+ sessionPool.deleteStorageGroups(Collections.singletonList(sg + i));
+ }
+ }
+
+ public void createTimeSeries() throws IoTDBConnectionException,
StatementExecutionException {
+ for (int i = 0; i < sgCnt; i++) {
+ String SG = sg + i;
+ for (int j = 0; j < dCnt; j++) {
+ String D = d + j;
+ for (int k = 0; k < sCnt; k++) {
+ String S = s + k;
+ sessionPool.createTimeseries(
+ SG + D + S, TSDataType.INT32, TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED);
+ }
+ }
+ }
+ }
+
+ public void deleteTimeSeries() throws IoTDBConnectionException,
StatementExecutionException {
+ for (int i = 0; i < sgCnt; i++) {
+ String SG = sg + i;
+ for (int j = 0; j < dCnt; j++) {
+ String D = d + j;
+ for (int k = 0; k < sCnt; k++) {
+ String S = s + k;
+ sessionPool.deleteTimeseries(SG + D + S);
+ }
+ }
+ }
+ }
+
+ public void insertData() throws IoTDBConnectionException,
StatementExecutionException {
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < sgCnt; i++) {
+ String SG = sg + i;
+ for (int j = 0; j < dCnt; j++) {
+ String D = d + j;
+ String device = SG + D;
+ List<String> measurements = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
+ for (int k = 0; k < sCnt; k++) {
+ measurements.add("s" + k);
+ types.add(TSDataType.INT32);
+ }
+ for (int l = 0; l < dataCnt; l++) {
+ List<Object> values = new ArrayList<>();
+ for (int k = 0; k < sCnt; k++) {
+ values.add(l);
+ }
+ sessionPool.insertRecord(device, l, measurements, types, values);
+ }
+ }
+ }
+ long endTime = System.currentTimeMillis();
+ System.out.println(
+ "Avg time per insert: "
+ + ((endTime - startTime) / (double) (sgCnt + dCnt + dataCnt))
+ + "ms");
+ }
+
+ public void deleteData() throws IoTDBConnectionException,
StatementExecutionException {
+ for (int i = 0; i < sgCnt; i++) {
+ String SG = sg + i;
+ for (int j = 0; j < dCnt; j++) {
+ String D = d + j;
+ for (int k = 0; k < sCnt; k++) {
+ String S = s + k;
+ sessionPool.deleteData(Collections.singletonList(SG + D + S), 0,
dataCnt);
+ }
+ }
+ }
+ }
+}
diff --git a/session/pom.xml b/session/pom.xml
index 634ad7c685..7072a7c426 100644
--- a/session/pom.xml
+++ b/session/pom.xml
@@ -71,19 +71,6 @@
<artifactId>service-rpc</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-server</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-server</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-jdbc</artifactId>
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java
b/session/src/main/java/org/apache/iotdb/session/Session.java
index 9a31211403..64555f584e 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -38,6 +38,7 @@ import
org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInternalSyncWriteReq;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.iotdb.service.rpc.thrift.TSPruneSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateReq;
@@ -1022,8 +1023,11 @@ public class Session {
genTSInsertStringRecordsReq(deviceIds, times, measurementsList,
valuesList, false);
try {
defaultSessionConnection.insertRecords(request);
- } catch (RedirectException ignored) {
- // ignore
+ } catch (RedirectException e) {
+ Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
+ for (Map.Entry<String, EndPoint> deviceEndPointEntry :
deviceEndPointMap.entrySet()) {
+ handleRedirection(deviceEndPointEntry.getKey(),
deviceEndPointEntry.getValue());
+ }
}
}
}
@@ -1056,8 +1060,11 @@ public class Session {
genTSInsertStringRecordsReq(deviceIds, times, measurementsList,
valuesList, true);
try {
defaultSessionConnection.insertRecords(request);
- } catch (RedirectException ignored) {
- // ignore
+ } catch (RedirectException e) {
+ Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
+ for (Map.Entry<String, EndPoint> deviceEndPointEntry :
deviceEndPointMap.entrySet()) {
+ handleRedirection(deviceEndPointEntry.getKey(),
deviceEndPointEntry.getValue());
+ }
}
}
}
@@ -1139,8 +1146,11 @@ public class Session {
genTSInsertRecordsReq(deviceIds, times, measurementsList, typesList,
valuesList, false);
try {
defaultSessionConnection.insertRecords(request);
- } catch (RedirectException ignored) {
- // ignore
+ } catch (RedirectException e) {
+ Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
+ for (Map.Entry<String, EndPoint> deviceEndPointEntry :
deviceEndPointMap.entrySet()) {
+ handleRedirection(deviceEndPointEntry.getKey(),
deviceEndPointEntry.getValue());
+ }
}
}
}
@@ -1174,8 +1184,11 @@ public class Session {
genTSInsertRecordsReq(deviceIds, times, measurementsList, typesList,
valuesList, true);
try {
defaultSessionConnection.insertRecords(request);
- } catch (RedirectException ignored) {
- // ignore
+ } catch (RedirectException e) {
+ Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
+ for (Map.Entry<String, EndPoint> deviceEndPointEntry :
deviceEndPointMap.entrySet()) {
+ handleRedirection(deviceEndPointEntry.getKey(),
deviceEndPointEntry.getValue());
+ }
}
}
}
@@ -1657,8 +1670,11 @@ public class Session {
genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted,
false);
try {
defaultSessionConnection.insertTablets(request);
- } catch (RedirectException ignored) {
- // ignored
+ } catch (RedirectException e) {
+ Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
+ for (Map.Entry<String, EndPoint> deviceEndPointEntry :
deviceEndPointMap.entrySet()) {
+ handleRedirection(deviceEndPointEntry.getKey(),
deviceEndPointEntry.getValue());
+ }
}
}
}
@@ -1692,8 +1708,11 @@ public class Session {
genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted,
true);
try {
defaultSessionConnection.insertTablets(request);
- } catch (RedirectException ignored) {
- // ignored
+ } catch (RedirectException e) {
+ Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
+ for (Map.Entry<String, EndPoint> deviceEndPointEntry :
deviceEndPointMap.entrySet()) {
+ handleRedirection(deviceEndPointEntry.getKey(),
deviceEndPointEntry.getValue());
+ }
}
}
}
@@ -2442,6 +2461,24 @@ public class Session {
}
}
+ /** Transmit insert record request for double write */
+ public void doubleWriteTransmit(ByteBuffer buffer)
+ throws IoTDBConnectionException, StatementExecutionException {
+ try {
+ TSInternalSyncWriteReq request = genTSExecuteDoubleWriteReq(buffer);
+ defaultSessionConnection.executeDoubleWrite(request);
+ } catch (RedirectException e) {
+ // ignored
+ }
+ }
+
+ private TSInternalSyncWriteReq genTSExecuteDoubleWriteReq(ByteBuffer buffer)
{
+ TSInternalSyncWriteReq request = new TSInternalSyncWriteReq();
+ request.setDoubleWriteType((byte) 0);
+ request.setPhysicalPlan(buffer);
+ return request;
+ }
+
public boolean isEnableQueryRedirection() {
return enableQueryRedirection;
}
@@ -2470,7 +2507,7 @@ public class Session {
private boolean enableCacheLeader = Config.DEFAULT_CACHE_LEADER_MODE;
private Version version = Config.DEFAULT_VERSION;
- List<String> nodeUrls = null;
+ private List<String> nodeUrls = null;
public Builder host(String host) {
this.host = host;
@@ -2535,16 +2572,19 @@ public class Session {
}
if (nodeUrls != null) {
- return new Session(
- nodeUrls,
- username,
- password,
- fetchSize,
- zoneId,
- thriftDefaultBufferSize,
- thriftMaxFrameSize,
- enableCacheLeader,
- version);
+ Session newSession =
+ new Session(
+ nodeUrls,
+ username,
+ password,
+ fetchSize,
+ zoneId,
+ thriftDefaultBufferSize,
+ thriftMaxFrameSize,
+ enableCacheLeader,
+ version);
+ newSession.setEnableQueryRedirection(true);
+ return newSession;
}
return new Session(
diff --git
a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 930f5baaa1..9ec01d4c7c 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -44,6 +44,7 @@ import
org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInternalSyncWriteReq;
import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
@@ -942,6 +943,25 @@ public class SessionConnection {
}
}
+ protected void executeDoubleWrite(TSInternalSyncWriteReq request)
+ throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
+ request.setSessionId(sessionId);
+ try {
+
RpcUtils.verifySuccessWithRedirection(client.executeDoubleWrite(request));
+ } catch (TException e) {
+ if (reconnect()) {
+ try {
+ request.setSessionId(sessionId);
+ RpcUtils.verifySuccess(client.executeDoubleWrite(request));
+ } catch (TException tException) {
+ throw new IoTDBConnectionException(tException);
+ }
+ } else {
+ throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+ }
+ }
+ }
+
public boolean isEnableRedirect() {
return enableRedirect;
}
diff --git
a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 900f4185e4..ad8561bab5 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -33,6 +33,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
@@ -93,6 +94,9 @@ public class SessionPool {
// whether the queue is closed.
private boolean closed;
+ // Redirect-able SessionPool
+ private final List<String> nodeUrls;
+
public SessionPool(String host, int port, String user, String password, int
maxSize) {
this(
host,
@@ -108,6 +112,20 @@ public class SessionPool {
Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
+ public SessionPool(List<String> nodeUrls, String user, String password, int
maxSize) {
+ this(
+ nodeUrls,
+ user,
+ password,
+ maxSize,
+ Config.DEFAULT_FETCH_SIZE,
+ 60_000,
+ false,
+ null,
+ Config.DEFAULT_CACHE_LEADER_MODE,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
+ }
+
public SessionPool(
String host, int port, String user, String password, int maxSize,
boolean enableCompression) {
this(
@@ -124,6 +142,21 @@ public class SessionPool {
Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
+ public SessionPool(
+ List<String> nodeUrls, String user, String password, int maxSize,
boolean enableCompression) {
+ this(
+ nodeUrls,
+ user,
+ password,
+ maxSize,
+ Config.DEFAULT_FETCH_SIZE,
+ 60_000,
+ enableCompression,
+ null,
+ Config.DEFAULT_CACHE_LEADER_MODE,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
+ }
+
public SessionPool(
String host,
int port,
@@ -146,6 +179,26 @@ public class SessionPool {
Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
+ public SessionPool(
+ List<String> nodeUrls,
+ String user,
+ String password,
+ int maxSize,
+ boolean enableCompression,
+ boolean enableCacheLeader) {
+ this(
+ nodeUrls,
+ user,
+ password,
+ maxSize,
+ Config.DEFAULT_FETCH_SIZE,
+ 60_000,
+ enableCompression,
+ null,
+ enableCacheLeader,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
+ }
+
public SessionPool(
String host, int port, String user, String password, int maxSize, ZoneId
zoneId) {
this(
@@ -162,6 +215,21 @@ public class SessionPool {
Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
+ public SessionPool(
+ List<String> nodeUrls, String user, String password, int maxSize, ZoneId
zoneId) {
+ this(
+ nodeUrls,
+ user,
+ password,
+ maxSize,
+ Config.DEFAULT_FETCH_SIZE,
+ 60_000,
+ false,
+ zoneId,
+ Config.DEFAULT_CACHE_LEADER_MODE,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
+ }
+
@SuppressWarnings("squid:S107")
public SessionPool(
String host,
@@ -178,6 +246,7 @@ public class SessionPool {
this.maxSize = maxSize;
this.host = host;
this.port = port;
+ this.nodeUrls = null;
this.user = user;
this.password = password;
this.fetchSize = fetchSize;
@@ -188,6 +257,60 @@ public class SessionPool {
this.connectionTimeoutInMs = connectionTimeoutInMs;
}
+ public SessionPool(
+ List<String> nodeUrls,
+ String user,
+ String password,
+ int maxSize,
+ int fetchSize,
+ long waitToGetSessionTimeoutInMs,
+ boolean enableCompression,
+ ZoneId zoneId,
+ boolean enableCacheLeader,
+ int connectionTimeoutInMs) {
+ this.maxSize = maxSize;
+ this.host = null;
+ this.port = -1;
+ this.nodeUrls = nodeUrls;
+ this.user = user;
+ this.password = password;
+ this.fetchSize = fetchSize;
+ this.waitToGetSessionTimeoutInMs = waitToGetSessionTimeoutInMs;
+ this.enableCompression = enableCompression;
+ this.zoneId = zoneId;
+ this.enableCacheLeader = enableCacheLeader;
+ this.connectionTimeoutInMs = connectionTimeoutInMs;
+ }
+
+ private Session constructNewSession() {
+ Session session;
+ if (nodeUrls == null) {
+ // Construct custom Session
+ session =
+ new Session.Builder()
+ .host(host)
+ .port(port)
+ .username(user)
+ .password(password)
+ .fetchSize(fetchSize)
+ .zoneId(zoneId)
+ .enableCacheLeader(enableCacheLeader)
+ .build();
+ } else {
+ // Construct redirect-able Session
+ session =
+ new Session.Builder()
+ .nodeUrls(nodeUrls)
+ .username(user)
+ .password(password)
+ .fetchSize(fetchSize)
+ .zoneId(zoneId)
+ .enableCacheLeader(enableCacheLeader)
+ .build();
+ }
+ return session;
+ }
+
// if this method throws an exception, either the server is broken, or the
ip/port/user/password
// is incorrect.
@SuppressWarnings({"squid:S3776", "squid:S2446"}) // Suppress high Cognitive
Complexity warning
@@ -254,9 +377,15 @@ public class SessionPool {
if (shouldCreate) {
// create a new one.
if (logger.isDebugEnabled()) {
- logger.debug("Create a new Session {}, {}, {}, {}", host, port, user,
password);
+ if (nodeUrls == null) {
+ logger.debug("Create a new Session {}, {}, {}, {}", host, port,
user, password);
+ } else {
+ logger.debug("Create a new redirect Session {}, {}, {}", nodeUrls,
user, password);
+ }
}
- session = new Session(host, port, user, password, fetchSize, zoneId,
enableCacheLeader);
+
+ session = constructNewSession();
+
try {
session.open(enableCompression, connectionTimeoutInMs);
// avoid someone has called close() the session pool
@@ -352,7 +481,7 @@ public class SessionPool {
@SuppressWarnings({"squid:S2446"})
private void tryConstructNewSession() {
- Session session = new Session(host, port, user, password, fetchSize,
zoneId, enableCacheLeader);
+ Session session = constructNewSession();
try {
session.open(enableCompression, connectionTimeoutInMs);
// avoid someone has called close() the session pool
@@ -2129,6 +2258,28 @@ public class SessionPool {
return null;
}
+ /** Transmit insert record request for double write */
+ public boolean doubleWriteTransmit(ByteBuffer buffer)
+ throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ buffer.position(0);
+ session.doubleWriteTransmit(buffer);
+ putBack(session);
+ return true;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ cleanSessionAndMayThrowConnectionException(session, i, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+
+ return false;
+ }
+
public int getMaxSize() {
return maxSize;
}
@@ -2177,6 +2328,7 @@ public class SessionPool {
private String host = Config.DEFAULT_HOST;
private int port = Config.DEFAULT_PORT;
+ private List<String> nodeUrls = null;
private int maxSize = Config.DEFAULT_SESSION_POOL_MAX_SIZE;
private String user = Config.DEFAULT_USER;
private String password = Config.DEFAULT_PASSWORD;
@@ -2197,6 +2349,11 @@ public class SessionPool {
return this;
}
+ public Builder nodeUrls(List<String> nodeUrls) {
+ this.nodeUrls = nodeUrls;
+ return this;
+ }
+
public Builder maxSize(int maxSize) {
this.maxSize = maxSize;
return this;
@@ -2243,18 +2400,32 @@ public class SessionPool {
}
public SessionPool build() {
- return new SessionPool(
- host,
- port,
- user,
- password,
- maxSize,
- fetchSize,
- waitToGetSessionTimeoutInMs,
- enableCompression,
- zoneId,
- enableCacheLeader,
- connectionTimeoutInMs);
+ if (nodeUrls == null) {
+ return new SessionPool(
+ host,
+ port,
+ user,
+ password,
+ maxSize,
+ fetchSize,
+ waitToGetSessionTimeoutInMs,
+ enableCompression,
+ zoneId,
+ enableCacheLeader,
+ connectionTimeoutInMs);
+ } else {
+ return new SessionPool(
+ nodeUrls,
+ user,
+ password,
+ maxSize,
+ fetchSize,
+ waitToGetSessionTimeoutInMs,
+ enableCompression,
+ zoneId,
+ enableCacheLeader,
+ connectionTimeoutInMs);
+ }
}
}
}
diff --git a/thrift/src/main/thrift/rpc.thrift
b/thrift/src/main/thrift/rpc.thrift
index 78ccce6db6..2fdeed1847 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -416,6 +416,12 @@ struct TSDropSchemaTemplateReq {
2: required string templateName
}
+struct TSInternalSyncWriteReq {
+ 1: required i64 sessionId
+ 2: required byte doubleWriteType
+ 3: required binary physicalPlan
+}
+
service TSIService {
TSOpenSessionResp openSession(1:TSOpenSessionReq req);
@@ -506,4 +512,6 @@ service TSIService {
TSStatus unsetSchemaTemplate(1:TSUnsetSchemaTemplateReq req);
TSStatus dropSchemaTemplate(1:TSDropSchemaTemplateReq req);
+
+ TSStatus executeDoubleWrite(1:TSInternalSyncWriteReq req);
}
\ No newline at end of file