761417898 commented on code in PR #15439:
URL: https://github.com/apache/iotdb/pull/15439#discussion_r2081258135


##########
iotdb-client/client-cpp/src/main/Session.cpp:
##########
@@ -833,16 +802,122 @@ void Session::initNodesSupplier() {
     endPoints.emplace_back(endPoint);
     if (enableAutoFetch) {
         nodesSupplier = NodesSupplier::create(endPoints, username, password);
-    } else {
+    }
+    else {
         nodesSupplier = make_shared<StaticNodesSupplier>(endPoints);
     }
 }
 
 void Session::initDefaultSessionConnection() {
     defaultEndPoint.__set_ip(host);
     defaultEndPoint.__set_port(rpcPort);
-    defaultSessionConnection = make_shared<SessionConnection>(this, 
defaultEndPoint, zoneId, nodesSupplier, 60, 500,
-            sqlDialect, database);
+    defaultSessionConnection = make_shared<SessionConnection>(this, 
defaultEndPoint, zoneId, nodesSupplier, fetchSize,
+                                                              60, 500,
+                                                              sqlDialect, 
database);
+}
+
+void Session::insertStringRecordsWithLeaderCache(vector<string> deviceIds, 
vector<int64_t> times,
+                                                 vector<vector<string>> 
measurementsList,
+                                                 vector<vector<string>> 
valuesList, bool isAligned) {
+    std::unordered_map<std::shared_ptr<SessionConnection>, 
TSInsertStringRecordsReq> recordsGroup;
+    for (int i = 0; i < deviceIds.size(); i++) {
+        auto connection = getSessionConnection(deviceIds[i]);
+        TSInsertStringRecordsReq request;
+        request.__set_sessionId(connection->sessionId);
+        request.__set_prefixPaths(deviceIds);
+        request.__set_timestamps(times);
+        request.__set_measurementsList(measurementsList);
+        request.__set_valuesList(valuesList);
+        request.__set_isAligned(isAligned);
+        recordsGroup.insert(make_pair(connection, request));
+    }
+    std::function<void(std::shared_ptr<SessionConnection>, const 
TSInsertStringRecordsReq&)> consumer =
+        [](const std::shared_ptr<SessionConnection>& c, const 
TSInsertStringRecordsReq& r) {
+        c->insertStringRecords(r);
+    };
+    if (recordsGroup.size() == 1) {
+        insertOnce(recordsGroup, consumer);
+    }
+    else {
+        insertByGroup(recordsGroup, consumer);
+    }
+}
+
+void Session::insertRecordsWithLeaderCache(vector<string> deviceIds, 
vector<int64_t> times,
+                                           vector<vector<string>> 
measurementsList,
+                                           const 
vector<vector<TSDataType::TSDataType>>& typesList,
+                                           vector<vector<char*>> valuesList, 
bool isAligned) {
+    std::unordered_map<std::shared_ptr<SessionConnection>, TSInsertRecordsReq> 
recordsGroup;
+    for (int i = 0; i < deviceIds.size(); i++) {
+        auto connection = getSessionConnection(deviceIds[i]);
+        TSInsertRecordsReq request;
+        request.__set_prefixPaths(deviceIds);
+        request.__set_timestamps(times);
+        request.__set_measurementsList(measurementsList);
+        vector<string> bufferList;
+        for (size_t i = 0; i < valuesList.size(); i++) {
+            string buffer;
+            putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
+            bufferList.push_back(buffer);
+        }
+        request.__set_valuesList(bufferList);
+        request.__set_isAligned(false);
+        recordsGroup.insert(make_pair(connection, request));
+    }
+    std::function<void(std::shared_ptr<SessionConnection>, const 
TSInsertRecordsReq&)> consumer =
+        [](const std::shared_ptr<SessionConnection>& c, const 
TSInsertRecordsReq& r) {
+        c->insertRecords(r);
+    };
+    if (recordsGroup.size() == 1) {
+        insertOnce(recordsGroup, consumer);
+    }
+    else {
+        insertByGroup(recordsGroup, consumer);
+    }
+}
+
+void Session::insertTabletsWithLeaderCache(unordered_map<string, Tablet*> 
tablets, bool sorted, bool isAligned) {
+    std::unordered_map<std::shared_ptr<SessionConnection>, TSInsertTabletsReq> 
tabletGroup;
+    if (tablets.empty()) {
+        throw BatchExecutionException("No tablet is inserting!");
+    }
+    auto beginIter = tablets.begin();
+    bool isFirstTabletAligned = ((*beginIter).second)->isAligned;
+    for (const auto& item : tablets) {
+        TSInsertTabletsReq request;
+        if (isFirstTabletAligned != item.second->isAligned) {
+            throw BatchExecutionException("The tablets should be all aligned 
or non-aligned!");
+        }
+        if (!checkSorted(*(item.second))) {
+            sortTablet(*(item.second));
+        }
+        request.prefixPaths.push_back(item.second->deviceId);
+        vector<string> measurements;
+        vector<int> dataTypes;
+        for (pair<string, TSDataType::TSDataType> schema : 
item.second->schemas) {
+            measurements.push_back(schema.first);
+            dataTypes.push_back(schema.second);
+        }
+        request.measurementsList.push_back(measurements);
+        request.typesList.push_back(dataTypes);
+        
request.timestampsList.push_back(move(SessionUtils::getTime(*(item.second))));
+        
request.valuesList.push_back(move(SessionUtils::getValue(*(item.second))));
+        request.sizeList.push_back(item.second->rowSize);
+        request.__set_isAligned(item.second->isAligned);
+        auto connection = getSessionConnection(item.first);
+        tabletGroup.insert(make_pair(connection, request));
+    }

Review Comment:
   fixed and added unittests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to