This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch support_table_model_redirect in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 73674da61587bad74cc898d53f00e87763097749 Author: HTHou <[email protected]> AuthorDate: Thu Sep 19 18:51:12 2024 +0800 dev session --- .../org/apache/iotdb/rpc/RedirectException.java | 15 ++ .../main/java/org/apache/iotdb/rpc/RpcUtils.java | 12 ++ .../java/org/apache/iotdb/session/Session.java | 164 ++++++++++++++++++++- .../plan/relational/planner/TableModelPlanner.java | 5 +- 4 files changed, 189 insertions(+), 7 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java index 8da65e8249c..81833854c23 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java @@ -22,6 +22,7 @@ package org.apache.iotdb.rpc; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import java.io.IOException; +import java.util.List; import java.util.Map; public class RedirectException extends IOException { @@ -29,17 +30,27 @@ public class RedirectException extends IOException { private final TEndPoint endPoint; private final Map<String, TEndPoint> deviceEndPointMap; + private final List<TEndPoint> deviceEndPointList; public RedirectException(TEndPoint endPoint) { super("later request in same group will be redirected to " + endPoint.toString()); this.endPoint = endPoint; this.deviceEndPointMap = null; + this.deviceEndPointList = null; } public RedirectException(Map<String, TEndPoint> deviceEndPointMap) { super("later request in same group will be redirected to " + deviceEndPointMap); this.endPoint = null; this.deviceEndPointMap = deviceEndPointMap; + this.deviceEndPointList = null; + } + + public RedirectException(List<TEndPoint> deviceEndPointList) { + super("later request in same group will be redirected to " + deviceEndPointList); + this.endPoint = null; + this.deviceEndPointMap = null; + this.deviceEndPointList = deviceEndPointList; } public TEndPoint getEndPoint() { @@ -49,4 +60,8 @@ public class RedirectException extends IOException { public Map<String, TEndPoint> getDeviceEndPointMap() { return deviceEndPointMap; } + + public List<TEndPoint> getDeviceEndPointList() { + return deviceEndPointList; + } } diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java index 1a7bc9970f6..440cbe4fbe9 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java @@ -36,6 +36,7 @@ import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -112,6 +113,17 @@ public class RpcUtils { if (status.isSetRedirectNode()) { throw new RedirectException(status.getRedirectNode()); } + if (status.isSetSubStatus()) { // insertRelationalTablet may set subStatus + List<TSStatus> statusSubStatus = status.getSubStatus(); + List<TEndPoint> deviceEndPointList = new ArrayList<>(statusSubStatus.size()); + for (int i = 0; i < statusSubStatus.size(); i++) { + TSStatus subStatus = statusSubStatus.get(i); + if (subStatus.isSetRedirectNode()) { + deviceEndPointList.set(i, subStatus.getRedirectNode()); + } + } + throw new RedirectException(deviceEndPointList); + } } public static void verifySuccessWithRedirectionForMultiDevices( diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java index 35229b54162..1769337b748 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java @@ -63,6 +63,7 @@ import org.apache.iotdb.session.util.ThreadUtils; import org.apache.thrift.TException; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.utils.Binary; @@ -164,6 +165,9 @@ public class Session implements ISession { @SuppressWarnings("squid:S3077") // Non-primitive fields should not be "volatile" protected volatile Map<String, TEndPoint> deviceIdToEndpoint; + @SuppressWarnings("squid:S3077") // Non-primitive fields should not be "volatile" + protected volatile Map<IDeviceID, TEndPoint> tableModelDeviceIdToEndpoint; + @SuppressWarnings("squid:S3077") // Non-primitive fields should not be "volatile" protected volatile Map<TEndPoint, SessionConnection> endPointToSessionConnection; @@ -535,6 +539,7 @@ public class Session implements ISession { isClosed = false; if (enableRedirection || enableQueryRedirection) { deviceIdToEndpoint = new ConcurrentHashMap<>(); + tableModelDeviceIdToEndpoint = new ConcurrentHashMap<>(); endPointToSessionConnection = new ConcurrentHashMap<>(); endPointToSessionConnection.put(defaultEndPoint, defaultSessionConnection); } @@ -1321,6 +1326,18 @@ public class Session implements ISession { } } + private SessionConnection getSessionConnection(IDeviceID deviceId) { + TEndPoint endPoint; + if (enableRedirection + && tableModelDeviceIdToEndpoint != null + && (endPoint = tableModelDeviceIdToEndpoint.get(deviceId)) != null + && endPointToSessionConnection.containsKey(endPoint)) { + return endPointToSessionConnection.get(endPoint); + } else { + return defaultSessionConnection; + } + } + @Override public String getTimestampPrecision() throws TException { return defaultSessionConnection.getClient().getProperties().getTimestampPrecision(); @@ -1384,6 +1401,34 @@ public class Session implements ISession { } } + private void handleRedirection(IDeviceID deviceId, TEndPoint endpoint) { + if (enableRedirection) { + // no need to redirection + if (endpoint.ip.equals("0.0.0.0")) { + return; + } + AtomicReference<IoTDBConnectionException> exceptionReference = new AtomicReference<>(); + if (!tableModelDeviceIdToEndpoint.containsKey(deviceId) + || !tableModelDeviceIdToEndpoint.get(deviceId).equals(endpoint)) { + tableModelDeviceIdToEndpoint.put(deviceId, endpoint); + } + SessionConnection connection = + endPointToSessionConnection.computeIfAbsent( + endpoint, + k -> { + try { + return constructSessionConnection(this, endpoint, zoneId); + } catch (IoTDBConnectionException ex) { + exceptionReference.set(ex); + return null; + } + }); + if (connection == null) { + tableModelDeviceIdToEndpoint.remove(deviceId); + } + } + } + private void handleQueryRedirection(TEndPoint endPoint) throws IoTDBConnectionException { if (enableQueryRedirection) { AtomicReference<IoTDBConnectionException> exceptionReference = new AtomicReference<>(); @@ -2679,11 +2724,20 @@ public class Session implements ISession { @Override public void insertRelationalTablet(Tablet tablet, boolean sorted) throws IoTDBConnectionException, StatementExecutionException { - TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted, false); - request.setWriteToTable(true); - request.setColumnCategories( - tablet.getColumnTypes().stream().map(t -> (byte) t.ordinal()).collect(Collectors.toList())); - insertTabletInternal(tablet, request); + if (enableRedirection) { + insertRelationalTabletWithLeaderCache(tablet); + } else { + TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted, false); + request.setWriteToTable(true); + request.setColumnCategories( + tablet.getColumnTypes().stream() + .map(t -> (byte) t.ordinal()) + .collect(Collectors.toList())); + try { + defaultSessionConnection.insertTablet(request); + } catch (RedirectException ignored) { + } + } } /** @@ -2697,6 +2751,106 @@ public class Session implements ISession { insertRelationalTablet(tablet, false); } + private void insertRelationalTabletWithLeaderCache(Tablet tablet) + throws IoTDBConnectionException, StatementExecutionException { + Map<SessionConnection, Tablet> relationalTabletGroup = new HashMap<>(); + + for (int i = 0; i < tablet.rowSize; i++) { + IDeviceID iDeviceID = tablet.getDeviceID(i); + final SessionConnection connection = getSessionConnection(iDeviceID); + int finalI = i; + relationalTabletGroup.compute( + connection, + (k, v) -> { + if (v == null) { + v = + new Tablet( + tablet.getTableName(), + tablet.getSchemas(), + tablet.getColumnTypes(), + tablet.rowSize); + } + for (int j = 0; j < v.getSchemas().size(); j++) { + v.addValue( + v.getSchemas().get(j).getMeasurementId(), finalI, tablet.getValue(finalI, j)); + } + v.addTimestamp(finalI, tablet.timestamps[finalI]); + v.rowSize++; + return v; + }); + } + insertRelationalTabletByGroup(relationalTabletGroup); + } + + @SuppressWarnings({ + "squid:S3776" + }) // ignore Cognitive Complexity of methods should not be too high + private <T> void insertRelationalTabletByGroup( + Map<SessionConnection, Tablet> relationalTabletGroup) + throws IoTDBConnectionException, StatementExecutionException { + List<CompletableFuture<Void>> completableFutures = + relationalTabletGroup.entrySet().stream() + .map( + entry -> { + SessionConnection connection = entry.getKey(); + Tablet subTablet = entry.getValue(); + return CompletableFuture.runAsync( + () -> { + TSInsertTabletReq request = genTSInsertTabletReq(subTablet, false, false); + request.setWriteToTable(true); + request.setColumnCategories( + subTablet.getColumnTypes().stream() + .map(t -> (byte) t.ordinal()) + .collect(Collectors.toList())); + InsertConsumer<TSInsertTabletReq> insertConsumer = + SessionConnection::insertTablet; + try { + insertConsumer.insert(connection, request); + } catch (RedirectException e) { + List<TEndPoint> endPointList = e.getDeviceEndPointList(); + Map<IDeviceID, TEndPoint> endPointMap = new HashMap<>(); + for (int i = 0; i < endPointList.size(); i++) { + if (endPointList.get(i) != null) { + endPointMap.put(subTablet.getDeviceID(i), endPointList.get(i)); + } + } + endPointMap.forEach(this::handleRedirection); + } catch (StatementExecutionException e) { + throw new CompletionException(e); + } catch (IoTDBConnectionException e) { + // remove the broken session + removeBrokenSessionConnection(connection); + try { + insertConsumer.insert(defaultSessionConnection, request); + } catch (IoTDBConnectionException | StatementExecutionException ex) { + throw new CompletionException(ex); + } catch (RedirectException ignored) { + } + } + }, + OPERATION_EXECUTOR); + }) + .collect(Collectors.toList()); + + StringBuilder errMsgBuilder = new StringBuilder(); + for (CompletableFuture<Void> completableFuture : completableFutures) { + try { + completableFuture.join(); + } catch (CompletionException completionException) { + Throwable cause = completionException.getCause(); + logger.error("Meet error when async insert!", cause); + if (cause instanceof IoTDBConnectionException) { + throw (IoTDBConnectionException) cause; + } else { + errMsgBuilder.append(cause.getMessage()); + } + } + } + if (errMsgBuilder.length() > 0) { + throw new StatementExecutionException(errMsgBuilder.toString()); + } + } + /** * insert the aligned timeseries data of a device. For each timestamp, the number of measurements * is the same. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java index 762939c7c74..1b90384efc3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java @@ -157,8 +157,9 @@ public class TableModelPlanner implements IPlanner { public void setRedirectInfo( IAnalysis iAnalysis, TEndPoint localEndPoint, TSStatus tsstatus, TSStatusCode statusCode) { Analysis analysis = (Analysis) iAnalysis; - - assert analysis.getStatement() instanceof WrappedInsertStatement; + if (!(analysis.getStatement() instanceof WrappedInsertStatement)) { + return; + } InsertBaseStatement insertStatement = ((WrappedInsertStatement) analysis.getStatement()).getInnerTreeStatement();
