This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch geely_car_session_pool in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1fd0af43b09dcf9678997c5392b70d2d489c0163 Author: Jinrui.Zhang <[email protected]> AuthorDate: Fri Feb 3 01:35:55 2023 +0800 add new feature DEVICE_GROUP --- .../org/apache/iotdb/session/pool/SessionPool.java | 45 ++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java index 0345adbe5a..3664a3d23c 100644 --- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java +++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java @@ -41,11 +41,13 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.time.ZoneId; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; /** * SessionPool is a wrapper of a Session Set. Using SessionPool, the user do not need to consider @@ -116,6 +118,32 @@ public class SessionPool implements ISessionPool { // Redirect-able SessionPool private final List<String> nodeUrls; + public static final int DEVICE_GROUP_NUMBER = 1000; + public static final long DEVICE_GROUP_SIZE = 1000_000; + + private final DeviceGroupDivider deviceGroupDivider = new DeviceGroupDivider(); + + public static class DeviceGroupDivider { + private final AtomicInteger[] deviceCount; + private final Map<String, Integer> deviceIndex; + + public DeviceGroupDivider() { + deviceCount = new AtomicInteger[DEVICE_GROUP_NUMBER]; + for (int i = 0 ; i < DEVICE_GROUP_NUMBER; i ++) { + deviceCount[i] = new AtomicInteger(0); + } + deviceIndex = new ConcurrentHashMap<>(); + } + + public int getGroupIndex(String device) { + return device.hashCode() % DEVICE_GROUP_NUMBER; + } + + public int getDeviceIndex(String device, int groupIndex) { + return deviceIndex.computeIfAbsent(device, d -> deviceCount[groupIndex].incrementAndGet()); + } + } + public SessionPool(String host, int port, String user, String password, int maxSize) { this( host, @@ -814,6 +842,23 @@ public class SessionPool implements ISessionPool { List<List<TSDataType>> typesList, List<List<Object>> valuesList) throws IoTDBConnectionException, StatementExecutionException { + + for (int i = 0; i < multiSeriesIds.size() ; i ++) { + String path = multiSeriesIds.get(i); + + String deviceId = path.substring(path.lastIndexOf("\\.") + 1); + + int groupIndex = deviceGroupDivider.getGroupIndex(path); + int deviceIndex = deviceGroupDivider.getDeviceIndex(path, groupIndex); + String newPath = path.substring(0, path.lastIndexOf("\\.") + 1) + "g_" + groupIndex; + multiSeriesIds.set(i, newPath); + + times.set(i, times.get(i) * DEVICE_GROUP_SIZE + deviceIndex); + multiMeasurementComponentsList.get(i).add("deviceId"); + typesList.get(i).add(TSDataType.TEXT); + valuesList.get(i).add(deviceId); + } + for (int i = 0; i < RETRY; i++) { ISession session = getSession(); try {
