This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d03bcffa517 implement new session constructor (#16256)
d03bcffa517 is described below
commit d03bcffa517f554662ce9b8179e24e7c8daf794e
Author: Hongzhi Gao <[email protected]>
AuthorDate: Tue Aug 26 18:09:06 2025 +0800
implement new session constructor (#16256)
* implement new session constructor
* cpp client session constructor
---
.../client-cpp/src/example/MultiSvrNodeClient.cpp | 147 +++++++++++++++++++++
.../client-cpp/src/main/AbstractSessionBuilder.h | 4 +
iotdb-client/client-cpp/src/main/Common.cpp | 44 ++++++
iotdb-client/client-cpp/src/main/Common.h | 18 +++
iotdb-client/client-cpp/src/main/NodesSupplier.cpp | 100 +++++++-------
iotdb-client/client-cpp/src/main/NodesSupplier.h | 1 +
iotdb-client/client-cpp/src/main/Session.cpp | 86 ++++++++++--
iotdb-client/client-cpp/src/main/Session.h | 14 +-
.../{TableSessionBuilder.h => SessionBuilder.h} | 68 ++++++----
.../client-cpp/src/main/SessionConnection.cpp | 2 +
.../client-cpp/src/main/SessionConnection.h | 3 +-
.../client-cpp/src/main/TableSessionBuilder.h | 4 +
iotdb-client/client-cpp/src/test/cpp/sessionIT.cpp | 96 ++++++++++++++
.../src/test/cpp/sessionRelationalIT.cpp | 24 ++++
14 files changed, 521 insertions(+), 90 deletions(-)
diff --git a/iotdb-client/client-cpp/src/example/MultiSvrNodeClient.cpp
b/iotdb-client/client-cpp/src/example/MultiSvrNodeClient.cpp
new file mode 100644
index 00000000000..68e3e7230ca
--- /dev/null
+++ b/iotdb-client/client-cpp/src/example/MultiSvrNodeClient.cpp
@@ -0,0 +1,147 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <iostream>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "Session.h"
+#include "SessionBuilder.h"
+#include "SessionDataSet.h"
+#include "TableSessionBuilder.h"
+
+namespace {
+
+void RunTreeExample() {
+ try {
+ std::vector<std::string> node_urls = {
+ "127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"};
+
+ auto builder = std::make_shared<SessionBuilder>();
+ auto session = std::shared_ptr<Session>(
+ builder->username("root")
+ ->password("root")
+ ->nodeUrls(node_urls)
+ ->build());
+
+ session->open();
+ if (!session->checkTimeseriesExists("root.test.d1.s1")) {
+ session->createTimeseries("root.test.d1.s1", TSDataType::INT64,
+ TSEncoding::RLE,
CompressionType::SNAPPY);
+ }
+ session->deleteTimeseries("root.test.d1.s1");
+ session->close();
+ } catch (const std::exception& e) {
+ std::cout << "Caught exception: " << e.what() << std::endl;
+ }
+}
+
+void RunTableExample() {
+ try {
+ std::vector<std::string> node_urls = {
+ "127.0.0.1:6669", "127.0.0.1:6668", "127.0.0.1:6667"};
+
+ auto builder = std::make_shared<TableSessionBuilder>();
+ auto session = std::shared_ptr<TableSession>(
+ builder->username("root")
+ ->password("root")
+ ->nodeUrls(node_urls)
+ ->build());
+
+ session->open();
+
+ session->executeNonQueryStatement("DROP DATABASE IF EXISTS db1");
+ session->executeNonQueryStatement("CREATE DATABASE db1");
+ session->executeNonQueryStatement("DROP DATABASE IF EXISTS db2");
+ session->executeNonQueryStatement("CREATE DATABASE db2");
+
+ session->close();
+ } catch (const std::exception& e) {
+ std::cout << "Caught exception: " << e.what() << std::endl;
+ }
+}
+
+
+// Example: continuously write/query data so you can manually stop a node
+// to test client failover behavior.
+void RunResilienceExample() {
+ try {
+ std::vector<std::string> node_urls = {
+ "127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"};
+
+ auto builder = std::make_shared<SessionBuilder>();
+ auto session = std::shared_ptr<Session>(
+ builder->username("root")
+ ->password("root")
+ ->nodeUrls(node_urls)
+ ->build());
+
+ session->open();
+
+ if (!session->checkTimeseriesExists("root.resilience.d1.s1")) {
+ session->createTimeseries("root.resilience.d1.s1",
TSDataType::INT64,
+ TSEncoding::RLE,
CompressionType::SNAPPY);
+ }
+
+ std::cout << "Starting resilience test. "
+ "Stop one node manually to see failover handling..."
+ << std::endl;
+
+ for (int i = 0; i < 60; ++i) { // run ~60 seconds
+ int64_t timestamp =
std::chrono::system_clock::now().time_since_epoch() /
+ std::chrono::milliseconds(1);
+ std::string value = to_string(i);
+ const char* value_cstr = value.c_str();
+
+ try {
+ session->insertRecord("root.resilience.d1", timestamp,
+ {"s1"}, {TSDataType::INT64},
+ {const_cast<char*>(value_cstr)});
+ std::cout << "[Insert] ts=" << timestamp << ", value=" << value
+ << std::endl;
+
+ auto dataset = session->executeQueryStatement(
+ "SELECT s1 FROM root.resilience.d1 LIMIT 1");
+ std::cout << "[Query] Got dataset: "
+ << (dataset ? "Success" : "Null") << std::endl;
+
+ } catch (const std::exception& e) {
+ std::cout << "Caught exception during resilience loop: " <<
e.what()
+ << std::endl;
+ }
+
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ }
+
+ session->close();
+ } catch (const std::exception& e) {
+ std::cout << "Caught exception in RunResilienceExample: " << e.what()
+ << std::endl;
+ }
+}
+
+} // namespace
+
+int main() {
+ //RunTreeExample();
+ //RunTableExample();
+ RunResilienceExample();
+ return 0;
+}
diff --git a/iotdb-client/client-cpp/src/main/AbstractSessionBuilder.h
b/iotdb-client/client-cpp/src/main/AbstractSessionBuilder.h
index 441e3a41f4f..3ec6da9b399 100644
--- a/iotdb-client/client-cpp/src/main/AbstractSessionBuilder.h
+++ b/iotdb-client/client-cpp/src/main/AbstractSessionBuilder.h
@@ -30,11 +30,15 @@ public:
std::string password = "root";
std::string zoneId = "";
int fetchSize = 10000;
+ int connectTimeoutMs = 3 * 1000;
+ int maxRetries = 3;
+ int retryDelayMs = 500;
std::string sqlDialect = "tree";
std::string database = "";
bool enableAutoFetch = true;
bool enableRedirections = true;
bool enableRPCCompression = false;
+ std::vector<std::string> nodeUrls;
};
#endif // IOTDB_ABSTRACTSESSIONBUILDER_H
\ No newline at end of file
diff --git a/iotdb-client/client-cpp/src/main/Common.cpp
b/iotdb-client/client-cpp/src/main/Common.cpp
index 38e8a31d2ff..cac4448b8e1 100644
--- a/iotdb-client/client-cpp/src/main/Common.cpp
+++ b/iotdb-client/client-cpp/src/main/Common.cpp
@@ -454,3 +454,47 @@ const std::vector<char>& BitMap::getByteArray() const {
size_t BitMap::getSize() const {
return this->size;
}
+
+const std::string UrlUtils::PORT_SEPARATOR = ":";
+const std::string UrlUtils::ABB_COLON = "[";
+
+TEndPoint UrlUtils::parseTEndPointIpv4AndIpv6Url(const std::string&
endPointUrl) {
+ TEndPoint endPoint;
+
+ // Return default TEndPoint if input is empty
+ if (endPointUrl.empty()) {
+ return endPoint;
+ }
+
+ size_t portSeparatorPos = endPointUrl.find_last_of(PORT_SEPARATOR);
+
+ // If no port separator found, treat entire string as IP
+ if (portSeparatorPos == std::string::npos) {
+ endPoint.__set_ip(endPointUrl);
+ return endPoint;
+ }
+
+ // Extract port part
+ std::string portStr = endPointUrl.substr(portSeparatorPos + 1);
+
+ // Extract IP part
+ std::string ip = endPointUrl.substr(0, portSeparatorPos);
+
+ // Handle IPv6 addresses with brackets
+ if (ip.find(ABB_COLON) != std::string::npos) {
+ // Remove surrounding square brackets for IPv6
+ if (ip.size() >= 2 && ip.front() == '[' && ip.back() == ']') {
+ ip = ip.substr(1, ip.size() - 2);
+ }
+ }
+
+ try {
+ int port = std::stoi(portStr);
+ endPoint.__set_ip(ip);
+ endPoint.__set_port(port);
+ } catch (const std::exception& e) {
+ endPoint.__set_ip(endPointUrl);
+ }
+
+ return endPoint;
+}
diff --git a/iotdb-client/client-cpp/src/main/Common.h
b/iotdb-client/client-cpp/src/main/Common.h
index aa583cb6ba8..129e458c67b 100644
--- a/iotdb-client/client-cpp/src/main/Common.h
+++ b/iotdb-client/client-cpp/src/main/Common.h
@@ -481,5 +481,23 @@ public:
static std::shared_ptr<TSFetchResultsResp> getTSFetchResultsResp(const
TSStatus& status);
};
+class UrlUtils {
+private:
+ static const std::string PORT_SEPARATOR;
+ static const std::string ABB_COLON;
+
+ UrlUtils() = delete;
+ ~UrlUtils() = delete;
+
+public:
+ /**
+ * Parse TEndPoint from a given TEndPointUrl
+ * example:[D80:0000:0000:0000:ABAA:0000:00C2:0002]:22227
+ *
+ * @param endPointUrl ip:port
+ * @return TEndPoint with default values if parse error
+ */
+ static TEndPoint parseTEndPointIpv4AndIpv6Url(const std::string&
endPointUrl);
+};
#endif
diff --git a/iotdb-client/client-cpp/src/main/NodesSupplier.cpp
b/iotdb-client/client-cpp/src/main/NodesSupplier.cpp
index f9ee90f169a..e5de1d94ebd 100644
--- a/iotdb-client/client-cpp/src/main/NodesSupplier.cpp
+++ b/iotdb-client/client-cpp/src/main/NodesSupplier.cpp
@@ -24,6 +24,7 @@
#include <utility>
const std::string NodesSupplier::SHOW_DATA_NODES_COMMAND = "SHOW DATANODES";
+const std::string NodesSupplier::RUNNING_STATUS = "Running";
const std::string NodesSupplier::STATUS_COLUMN_NAME = "Status";
const std::string NodesSupplier::IP_COLUMN_NAME = "RpcAddress";
const std::string NodesSupplier::PORT_COLUMN_NAME = "RpcPort";
@@ -137,6 +138,7 @@ void NodesSupplier::deduplicateEndpoints() {
void NodesSupplier::startBackgroundRefresh(std::chrono::milliseconds interval)
{
isRunning_ = true;
+ refreshEndpointList();
refreshThread_ = std::thread([this, interval] {
while (isRunning_) {
refreshEndpointList();
@@ -149,60 +151,66 @@ void
NodesSupplier::startBackgroundRefresh(std::chrono::milliseconds interval) {
}
std::vector<TEndPoint> NodesSupplier::fetchLatestEndpoints() {
+ for (const auto& endpoint : endpoints_) {
try {
- if (client_ == nullptr) {
- client_ =
std::make_shared<ThriftConnection>(selectionPolicy_(endpoints_));
- client_->init(userName_, password_, enableRPCCompression_,
zoneId_, version);
+ if (client_ == nullptr) {
+ client_ = std::make_shared<ThriftConnection>(endpoint);
+ client_->init(userName_, password_, enableRPCCompression_, zoneId_,
version);
+ }
+
+ auto sessionDataSet =
client_->executeQueryStatement(SHOW_DATA_NODES_COMMAND);
+
+ uint32_t columnAddrIdx = -1, columnPortIdx = -1, columnStatusIdx = -1;
+ auto columnNames = sessionDataSet->getColumnNames();
+ for (uint32_t i = 0; i < columnNames.size(); i++) {
+ if (columnNames[i] == IP_COLUMN_NAME) {
+ columnAddrIdx = i;
+ } else if (columnNames[i] == PORT_COLUMN_NAME) {
+ columnPortIdx = i;
+ } else if (columnNames[i] == STATUS_COLUMN_NAME) {
+ columnStatusIdx = i;
}
+ }
- auto sessionDataSet =
client_->executeQueryStatement(SHOW_DATA_NODES_COMMAND);
-
- uint32_t columnAddrIdx = -1, columnPortIdx = -1, columnStatusIdx = -1;
- auto columnNames = sessionDataSet->getColumnNames();
- for (uint32_t i = 0; i < columnNames.size(); i++) {
- if (columnNames[i] == IP_COLUMN_NAME) {
- columnAddrIdx = i;
- } else if (columnNames[i] == PORT_COLUMN_NAME) {
- columnPortIdx = i;
- } else if (columnNames[i] == STATUS_COLUMN_NAME) {
- columnStatusIdx = i;
- }
- }
+ if (columnAddrIdx == -1 || columnPortIdx == -1 || columnStatusIdx == -1)
{
+ throw IoTDBException("Required columns not found in query result.");
+ }
- if (columnAddrIdx == -1 || columnPortIdx == -1 || columnStatusIdx ==
-1) {
- throw IoTDBException("Required columns not found in query
result.");
- }
+ std::vector<TEndPoint> ret;
+ while (sessionDataSet->hasNext()) {
+ auto record = sessionDataSet->next();
+ std::string ip;
+ int32_t port = 0;
+ std::string status;
- std::vector<TEndPoint> ret;
- while (sessionDataSet->hasNext()) {
- auto record = sessionDataSet->next();
- std::string ip;
- int32_t port;
- std::string status;
- if (record->fields.at(columnAddrIdx).stringV.is_initialized()) {
- ip = record->fields.at(columnAddrIdx).stringV.value();
- }
- if (record->fields.at(columnPortIdx).intV.is_initialized()) {
- port = record->fields.at(columnPortIdx).intV.value();
- }
- if (record->fields.at(columnStatusIdx).stringV.is_initialized()) {
- status = record->fields.at(columnStatusIdx).stringV.value();
- }
- if (ip == "0.0.0.0" || status == REMOVING_STATUS) {
- log_warn("Skipping invalid node: " + ip + ":" +
to_string(port));
- continue;
- }
- TEndPoint endpoint;
- endpoint.ip = ip;
- endpoint.port = port;
- ret.emplace_back(endpoint);
+ if (record->fields.at(columnAddrIdx).stringV.is_initialized()) {
+ ip = record->fields.at(columnAddrIdx).stringV.value();
+ }
+ if (record->fields.at(columnPortIdx).intV.is_initialized()) {
+ port = record->fields.at(columnPortIdx).intV.value();
+ }
+ if (record->fields.at(columnStatusIdx).stringV.is_initialized()) {
+ status = record->fields.at(columnStatusIdx).stringV.value();
}
- return ret;
- } catch (const IoTDBException& e) {
- client_.reset();
- throw IoTDBException(std::string("NodesSupplier::fetchLatestEndpoints
failed: ") + e.what());
+ if (ip == "0.0.0.0" || status != RUNNING_STATUS) {
+ log_warn("Skipping invalid node: " + ip + ":" +
std::to_string(port));
+ continue;
+ }
+ TEndPoint newEndpoint;
+ newEndpoint.ip = ip;
+ newEndpoint.port = port;
+ ret.emplace_back(newEndpoint);
+ }
+ return ret; // success
+ } catch (const std::exception& e) {
+ log_warn("Failed to fetch endpoints from " + endpoint.ip + ":" +
+ std::to_string(endpoint.port) + " , error=" + e.what());
+ client_.reset(); // reset client before retrying next endpoint
+ continue; // try next endpoint
}
+ }
+ throw IoTDBException("NodesSupplier::fetchLatestEndpoints failed: all nodes
unreachable.");
}
void NodesSupplier::refreshEndpointList() {
diff --git a/iotdb-client/client-cpp/src/main/NodesSupplier.h
b/iotdb-client/client-cpp/src/main/NodesSupplier.h
index a3cda24deac..7e1a6e5af3f 100644
--- a/iotdb-client/client-cpp/src/main/NodesSupplier.h
+++ b/iotdb-client/client-cpp/src/main/NodesSupplier.h
@@ -65,6 +65,7 @@ private:
class NodesSupplier : public INodesSupplier {
public:
static const std::string SHOW_DATA_NODES_COMMAND;
+ static const std::string RUNNING_STATUS;
static const std::string STATUS_COLUMN_NAME;
static const std::string IP_COLUMN_NAME;
static const std::string PORT_COLUMN_NAME;
diff --git a/iotdb-client/client-cpp/src/main/Session.cpp
b/iotdb-client/client-cpp/src/main/Session.cpp
index 154a9f6734a..6b5dd6ab5d7 100644
--- a/iotdb-client/client-cpp/src/main/Session.cpp
+++ b/iotdb-client/client-cpp/src/main/Session.cpp
@@ -725,12 +725,43 @@ void Session::initZoneId() {
zoneId_ = zoneStr;
}
-void Session::initNodesSupplier() {
+void Session::initNodesSupplier(const std::vector<std::string>& nodeUrls) {
std::vector<TEndPoint> endPoints;
- TEndPoint endPoint;
- endPoint.__set_ip(host_);
- endPoint.__set_port(rpcPort_);
- endPoints.emplace_back(endPoint);
+ std::unordered_set<std::string> uniqueEndpoints;
+
+ if (nodeUrls.empty() && host_.empty()) {
+ throw IoTDBException("No available nodes");
+ }
+
+ // Process provided node URLs
+ if (!nodeUrls.empty()) {
+ for (auto& url : nodeUrls) {
+ try {
+ TEndPoint endPoint =
UrlUtils::parseTEndPointIpv4AndIpv6Url(url);
+ if (endPoint.port == 0) continue; // Skip invalid endpoints
+
+ std::string endpointKey = endPoint.ip + ":" +
std::to_string(endPoint.port);
+ if (uniqueEndpoints.find(endpointKey) ==
uniqueEndpoints.end()) {
+ endPoints.emplace_back(std::move(endPoint));
+ uniqueEndpoints.insert(std::move(endpointKey));
+ }
+ } catch (...) {
+ continue; // Skip malformed URLs
+ }
+ }
+ }
+
+ // Fallback to local endpoint if no valid endpoints found
+ if (endPoints.empty()) {
+ if (host_.empty() || rpcPort_ == 0) {
+ throw IoTDBException("No valid endpoints available");
+ }
+ TEndPoint endPoint;
+ endPoint.__set_ip(host_);
+ endPoint.__set_port(rpcPort_);
+ endPoints.emplace_back(std::move(endPoint));
+ }
+
if (enableAutoFetch_) {
nodesSupplier_ = NodesSupplier::create(endPoints, username_,
password_);
}
@@ -740,11 +771,35 @@ void Session::initNodesSupplier() {
}
void Session::initDefaultSessionConnection() {
- defaultEndPoint_.__set_ip(host_);
- defaultEndPoint_.__set_port(rpcPort_);
- defaultSessionConnection_ = make_shared<SessionConnection>(this,
defaultEndPoint_, zoneId_, nodesSupplier_, fetchSize_,
- 60, 500,
- sqlDialect_,
database_);
+ // Try all endpoints from supplier until a connection is established.
+ auto endpoints = nodesSupplier_->getEndPointList();
+ bool connected = false;
+
+ for (const auto& endpoint : endpoints) {
+ try {
+ host_ = endpoint.ip;
+ rpcPort_ = endpoint.port;
+
+ defaultEndPoint_.__set_ip(host_);
+ defaultEndPoint_.__set_port(rpcPort_);
+
+ defaultSessionConnection_ = std::make_shared<SessionConnection>(
+ this, defaultEndPoint_, zoneId_, nodesSupplier_, fetchSize_,
+ 3,
+ 500, connectTimeoutMs_,
+ sqlDialect_, database_);
+
+ connected = true;
+ break;
+ } catch (const std::exception& e) {
+ std::cout << "Failed to connect to " << endpoint.ip << ":"
+ << endpoint.port << " , error=" << e.what() << std::endl;
+ }
+ }
+
+ if (!connected) {
+ throw std::runtime_error("No available node to establish
SessionConnection.");
+ }
}
void Session::insertStringRecordsWithLeaderCache(vector<string> deviceIds,
vector<int64_t> times,
@@ -1855,6 +1910,9 @@ void Session::createAlignedTimeseries(const std::string&
deviceId,
bool Session::checkTimeseriesExists(const string& path) {
try {
std::unique_ptr<SessionDataSet> dataset = executeQueryStatement("SHOW
TIMESERIES " + path);
+ if (dataset == nullptr) {
+ throw IoTDBException("executeQueryStatement failed");
+ }
bool isExisted = dataset->hasNext();
dataset->closeOperationHandle();
return isExisted;
@@ -1879,7 +1937,7 @@ shared_ptr<SessionConnection>
Session::getQuerySessionConnection() {
shared_ptr<SessionConnection> newConnection;
try {
newConnection = make_shared<SessionConnection>(this, endPoint.value(),
zoneId_, nodesSupplier_,
- fetchSize_, 60, 500,
sqlDialect_, database_);
+ fetchSize_, 60, 500,
connectTimeoutMs_, sqlDialect_, database_);
endPointToSessionConnection.emplace(endPoint.value(), newConnection);
return newConnection;
}
@@ -1937,7 +1995,7 @@ void Session::handleQueryRedirection(TEndPoint endPoint) {
else {
try {
newConnection = make_shared<SessionConnection>(this, endPoint,
zoneId_, nodesSupplier_,
- fetchSize_, 60,
500, sqlDialect_, database_);
+ fetchSize_, 60,
500, connectTimeoutMs_, sqlDialect_, database_);
endPointToSessionConnection.emplace(endPoint, newConnection);
}
@@ -1961,7 +2019,7 @@ void Session::handleRedirection(const std::string&
deviceId, TEndPoint endPoint)
else {
try {
newConnection = make_shared<SessionConnection>(this, endPoint,
zoneId_, nodesSupplier_,
- fetchSize_, 60,
500, sqlDialect_, database_);
+ fetchSize_, 60,
500, 1000, sqlDialect_, database_);
endPointToSessionConnection.emplace(endPoint, newConnection);
}
catch (exception& e) {
@@ -1984,7 +2042,7 @@ void Session::handleRedirection(const
std::shared_ptr<storage::IDeviceID>& devic
else {
try {
newConnection = make_shared<SessionConnection>(this, endPoint,
zoneId_, nodesSupplier_,
- fetchSize_, 60,
500, sqlDialect_, database_);
+ fetchSize_, 3, 500,
connectTimeoutMs_, sqlDialect_, database_);
endPointToSessionConnection.emplace(endPoint, newConnection);
}
catch (exception& e) {
diff --git a/iotdb-client/client-cpp/src/main/Session.h
b/iotdb-client/client-cpp/src/main/Session.h
index cd418ff5454..35c05fad22c 100644
--- a/iotdb-client/client-cpp/src/main/Session.h
+++ b/iotdb-client/client-cpp/src/main/Session.h
@@ -535,6 +535,7 @@ class Session {
private:
std::string host_;
int rpcPort_;
+ std::vector<string> nodeUrls_;
std::string username_;
std::string password_;
const TSProtocolVersion::type protocolVersion_ =
TSProtocolVersion::IOTDB_SERVICE_PROTOCOL_V3;
@@ -543,6 +544,7 @@ private:
int fetchSize_;
const static int DEFAULT_FETCH_SIZE = 10000;
const static int DEFAULT_TIMEOUT_MS = 0;
+ int connectTimeoutMs_;
Version::Version version;
std::string sqlDialect_ = "tree"; // default sql dialect
std::string database_;
@@ -605,7 +607,7 @@ private:
void initZoneId();
- void initNodesSupplier();
+ void initNodesSupplier(const std::vector<std::string>& nodeUrls =
std::vector<std::string>());
void initDefaultSessionConnection();
@@ -664,6 +666,12 @@ public:
initNodesSupplier();
}
+ Session(const std::vector<string>& nodeUrls, const std::string& username,
const std::string& password)
+ : nodeUrls_(nodeUrls), username_(username), password_(password),
version(Version::V_1_0) {
+ initZoneId();
+ initNodesSupplier(this->nodeUrls_);
+ }
+
Session(const std::string& host, int rpcPort, const std::string& username,
const std::string& password)
: fetchSize_(DEFAULT_FETCH_SIZE) {
this->host_ = host;
@@ -714,8 +722,10 @@ public:
this->database_ = builder->database;
this->enableAutoFetch_ = builder->enableAutoFetch;
this->enableRedirection_ = builder->enableRedirections;
+ this->connectTimeoutMs_ = builder->connectTimeoutMs;
+ this->nodeUrls_ = builder->nodeUrls;
initZoneId();
- initNodesSupplier();
+ initNodesSupplier(this->nodeUrls_);
}
~Session();
diff --git a/iotdb-client/client-cpp/src/main/TableSessionBuilder.h
b/iotdb-client/client-cpp/src/main/SessionBuilder.h
similarity index 54%
copy from iotdb-client/client-cpp/src/main/TableSessionBuilder.h
copy to iotdb-client/client-cpp/src/main/SessionBuilder.h
index 5c1b1a0a22e..34f92dd22f6 100644
--- a/iotdb-client/client-cpp/src/main/TableSessionBuilder.h
+++ b/iotdb-client/client-cpp/src/main/SessionBuilder.h
@@ -17,60 +17,74 @@
* under the License.
*/
-// This file is a translation of the Java file
iotdb-client/session/src/main/java/org/apache/iotdb/session/TableSessionBuilder.java
+#ifndef IOTDB_SESSION_BUILDER_H
+#define IOTDB_SESSION_BUILDER_H
-#ifndef IOTDB_TABLESESSIONBUILDER_H
-#define IOTDB_TABLESESSIONBUILDER_H
-
-#include "TableSession.h"
#include "AbstractSessionBuilder.h"
-class TableSessionBuilder : public AbstractSessionBuilder {
- /*
- std::string host;
- int rpcPort;
- std::string username;
- std::string password;
- std::string zoneId;
- int fetchSize;
- std::string sqlDialect = "tree"; // default sql dialect
- std::string database;
- */
+class SessionBuilder : public AbstractSessionBuilder {
public:
- TableSessionBuilder* host(const std::string &host) {
+ SessionBuilder* host(const std::string &host) {
AbstractSessionBuilder::host = host;
return this;
}
- TableSessionBuilder* rpcPort(int rpcPort) {
+
+ SessionBuilder* rpcPort(int rpcPort) {
AbstractSessionBuilder::rpcPort = rpcPort;
return this;
}
- TableSessionBuilder* username(const std::string &username) {
+
+ SessionBuilder* username(const std::string &username) {
AbstractSessionBuilder::username = username;
return this;
}
- TableSessionBuilder* password(const std::string &password) {
+
+ SessionBuilder* password(const std::string &password) {
AbstractSessionBuilder::password = password;
return this;
}
- TableSessionBuilder* zoneId(const std::string &zoneId) {
+
+ SessionBuilder* zoneId(const std::string &zoneId) {
AbstractSessionBuilder::zoneId = zoneId;
return this;
}
- TableSessionBuilder* fetchSize(int fetchSize) {
+
+ SessionBuilder* fetchSize(int fetchSize) {
AbstractSessionBuilder::fetchSize = fetchSize;
return this;
}
- TableSessionBuilder* database(const std::string &database) {
+
+ SessionBuilder* database(const std::string &database) {
AbstractSessionBuilder::database = database;
return this;
}
- TableSession* build() {
- sqlDialect = "table";
+
+ SessionBuilder* nodeUrls(const std::vector<std::string>& nodeUrls) {
+ AbstractSessionBuilder::nodeUrls = nodeUrls;
+ return this;
+ }
+
+ SessionBuilder* enableAutoFetch(bool enableAutoFetch) {
+ AbstractSessionBuilder::enableAutoFetch = enableAutoFetch;
+ return this;
+ }
+
+ SessionBuilder* enableRedirections(bool enableRedirections) {
+ AbstractSessionBuilder::enableRedirections = enableRedirections;
+ return this;
+ }
+
+ SessionBuilder* enableRPCCompression(bool enableRPCCompression) {
+ AbstractSessionBuilder::enableRPCCompression = enableRPCCompression;
+ return this;
+ }
+
+ Session* build() {
+ sqlDialect = "tree";
Session* newSession = new Session(this);
newSession->open(false);
- return new TableSession(newSession);
+ return new Session(this);
}
};
-#endif // IOTDB_TABLESESSIONBUILDER_H
\ No newline at end of file
+#endif // IOTDB_SESSION_BUILDER_H
\ No newline at end of file
diff --git a/iotdb-client/client-cpp/src/main/SessionConnection.cpp
b/iotdb-client/client-cpp/src/main/SessionConnection.cpp
index 18714172374..a6928b14d29 100644
--- a/iotdb-client/client-cpp/src/main/SessionConnection.cpp
+++ b/iotdb-client/client-cpp/src/main/SessionConnection.cpp
@@ -35,6 +35,7 @@ SessionConnection::SessionConnection(Session* session_ptr,
const TEndPoint& endp
int fetchSize,
int maxRetries,
int64_t retryInterval,
+ int64_t connectionTimeout,
std::string dialect,
std::string db)
: session(session_ptr),
@@ -44,6 +45,7 @@ SessionConnection::SessionConnection(Session* session_ptr,
const TEndPoint& endp
fetchSize(fetchSize),
maxRetryCount(maxRetries),
retryIntervalMs(retryInterval),
+ connectionTimeoutInMs(connectionTimeout),
sqlDialect(std::move(dialect)),
database(std::move(db)) {
this->zoneId = zoneId.empty() ? getSystemDefaultZoneId() : zoneId;
diff --git a/iotdb-client/client-cpp/src/main/SessionConnection.h
b/iotdb-client/client-cpp/src/main/SessionConnection.h
index 915d25702c1..7102222b216 100644
--- a/iotdb-client/client-cpp/src/main/SessionConnection.h
+++ b/iotdb-client/client-cpp/src/main/SessionConnection.h
@@ -37,8 +37,9 @@ public:
const std::string& zoneId,
std::shared_ptr<INodesSupplier> nodeSupplier,
int fetchSize = 10000,
- int maxRetries = 60,
+ int maxRetries = 3,
int64_t retryInterval = 500,
+ int64_t connectionTimeoutMs = 3 * 1000,
std::string dialect = "tree",
std::string db = "");
diff --git a/iotdb-client/client-cpp/src/main/TableSessionBuilder.h
b/iotdb-client/client-cpp/src/main/TableSessionBuilder.h
index 5c1b1a0a22e..db6c749d184 100644
--- a/iotdb-client/client-cpp/src/main/TableSessionBuilder.h
+++ b/iotdb-client/client-cpp/src/main/TableSessionBuilder.h
@@ -65,6 +65,10 @@ public:
AbstractSessionBuilder::database = database;
return this;
}
+ TableSessionBuilder* nodeUrls(const std::vector<string>& nodeUrls) {
+ AbstractSessionBuilder::nodeUrls = nodeUrls;
+ return this;
+ }
TableSession* build() {
sqlDialect = "table";
Session* newSession = new Session(this);
diff --git a/iotdb-client/client-cpp/src/test/cpp/sessionIT.cpp
b/iotdb-client/client-cpp/src/test/cpp/sessionIT.cpp
index bf2c7521769..3c0657b07ef 100644
--- a/iotdb-client/client-cpp/src/test/cpp/sessionIT.cpp
+++ b/iotdb-client/client-cpp/src/test/cpp/sessionIT.cpp
@@ -19,6 +19,7 @@
#include "catch.hpp"
#include "Session.h"
+#include "SessionBuilder.h"
using namespace std;
@@ -62,6 +63,42 @@ TEST_CASE("Create timeseries success", "[createTimeseries]")
{
session->deleteTimeseries("root.test.d1.s1");
}
+TEST_CASE("Test Session constructor with nodeUrls", "[SessionInitAndOperate]")
{
+ CaseReporter cr("SessionInitWithNodeUrls");
+
+ std::vector<std::string> nodeUrls = {"127.0.0.1:6667"};
+ std::shared_ptr<Session> localSession =
std::make_shared<Session>(nodeUrls, "root", "root");
+ localSession->open();
+ if (!localSession->checkTimeseriesExists("root.test.d1.s1")) {
+ localSession->createTimeseries("root.test.d1.s1", TSDataType::INT64,
TSEncoding::RLE, CompressionType::SNAPPY);
+ }
+ REQUIRE(localSession->checkTimeseriesExists("root.test.d1.s1") == true);
+ localSession->deleteTimeseries("root.test.d1.s1");
+ localSession->close();
+}
+
+TEST_CASE("Test Session builder with nodeUrls", "[SessionBuilderInit]") {
+ CaseReporter cr("SessionInitWithNodeUrls");
+
+ std::vector<std::string> nodeUrls = {"127.0.0.1:6667"};
+ auto builder = std::unique_ptr<SessionBuilder>(new SessionBuilder());
+ std::shared_ptr<Session> session =
+ std::shared_ptr<Session>(
+ builder
+ ->username("root")
+ ->password("root")
+ ->nodeUrls(nodeUrls)
+ ->build()
+ );
+ session->open();
+ if (!session->checkTimeseriesExists("root.test.d1.s1")) {
+ session->createTimeseries("root.test.d1.s1", TSDataType::INT64,
TSEncoding::RLE, CompressionType::SNAPPY);
+ }
+ REQUIRE(session->checkTimeseriesExists("root.test.d1.s1") == true);
+ session->deleteTimeseries("root.test.d1.s1");
+ session->close();
+}
+
TEST_CASE("Delete timeseries success", "[deleteTimeseries]") {
CaseReporter cr("deleteTimeseries");
if (!session->checkTimeseriesExists("root.test.d1.s1")) {
@@ -715,3 +752,62 @@ TEST_CASE("Test executeLastDataQuery ",
"[testExecuteLastDataQuery]") {
sessionDataSet->setFetchSize(1024);
REQUIRE(sessionDataSet->hasNext() == false);
}
+
+// Helper function for comparing TEndPoint with detailed error message
+void assertTEndPointEqual(const TEndPoint& actual,
+ const std::string& expectedIp,
+ int expectedPort,
+ const char* file,
+ int line) {
+ if (actual.ip != expectedIp || actual.port != expectedPort) {
+ std::stringstream ss;
+ ss << "\nTEndPoint mismatch:\nExpected: " << expectedIp << ":" <<
expectedPort
+ << "\nActual: " << actual.ip << ":" << actual.port;
+ Catch::SourceLineInfo location(file, line);
+ Catch::AssertionHandler handler("TEndPoint comparison", location,
ss.str(), Catch::ResultDisposition::Normal);
+ handler.handleMessage(Catch::ResultWas::ExplicitFailure, ss.str());
+ handler.complete();
+ }
+}
+
+// Macro to simplify test assertions
+#define REQUIRE_TENDPOINT(actual, expectedIp, expectedPort) \
+ assertTEndPointEqual(actual, expectedIp, expectedPort, __FILE__, __LINE__)
+
+TEST_CASE("UrlUtils - parseTEndPointIpv4AndIpv6Url", "[UrlUtils]") {
+ // Test valid IPv4 addresses
+ SECTION("Valid IPv4") {
+
REQUIRE_TENDPOINT(UrlUtils::parseTEndPointIpv4AndIpv6Url("192.168.1.1:8080"),
"192.168.1.1", 8080);
+
REQUIRE_TENDPOINT(UrlUtils::parseTEndPointIpv4AndIpv6Url("10.0.0.1:80"),
"10.0.0.1", 80);
+ }
+
+ // Test valid IPv6 addresses
+ SECTION("Valid IPv6") {
+
REQUIRE_TENDPOINT(UrlUtils::parseTEndPointIpv4AndIpv6Url("[2001:db8::1]:8080"),
"2001:db8::1", 8080);
+ REQUIRE_TENDPOINT(UrlUtils::parseTEndPointIpv4AndIpv6Url("[::1]:80"),
"::1", 80);
+ }
+
+ // Test hostnames
+ SECTION("Hostnames") {
+
REQUIRE_TENDPOINT(UrlUtils::parseTEndPointIpv4AndIpv6Url("localhost:8080"),
"localhost", 8080);
+
REQUIRE_TENDPOINT(UrlUtils::parseTEndPointIpv4AndIpv6Url("example.com:443"),
"example.com", 443);
+ }
+
+ // Test edge cases
+ SECTION("Edge cases") {
+ REQUIRE_TENDPOINT(UrlUtils::parseTEndPointIpv4AndIpv6Url(""), "", 0);
+ REQUIRE_TENDPOINT(UrlUtils::parseTEndPointIpv4AndIpv6Url("127.0.0.1"),
"127.0.0.1", 0);
+ }
+
+ // Test invalid inputs
+ SECTION("Invalid inputs") {
+
REQUIRE_TENDPOINT(UrlUtils::parseTEndPointIpv4AndIpv6Url("192.168.1.1:abc"),
"192.168.1.1:abc", 0);
+
REQUIRE_TENDPOINT(UrlUtils::parseTEndPointIpv4AndIpv6Url("]invalid[:80"),
"]invalid[", 80);
+ }
+
+ // Test port ranges
+ SECTION("Port ranges") {
+
REQUIRE_TENDPOINT(UrlUtils::parseTEndPointIpv4AndIpv6Url("localhost:0"),
"localhost", 0);
+
REQUIRE_TENDPOINT(UrlUtils::parseTEndPointIpv4AndIpv6Url("127.0.0.1:65535"),
"127.0.0.1", 65535);
+ }
+}
\ No newline at end of file
diff --git a/iotdb-client/client-cpp/src/test/cpp/sessionRelationalIT.cpp
b/iotdb-client/client-cpp/src/test/cpp/sessionRelationalIT.cpp
index 0912ddd5bf3..94f956b56ea 100644
--- a/iotdb-client/client-cpp/src/test/cpp/sessionRelationalIT.cpp
+++ b/iotdb-client/client-cpp/src/test/cpp/sessionRelationalIT.cpp
@@ -19,6 +19,7 @@
#include "catch.hpp"
#include "TableSession.h"
+#include "TableSessionBuilder.h"
#include <math.h>
using namespace std;
@@ -67,6 +68,29 @@ TEST_CASE("Create table success", "[createTable]") {
REQUIRE(tableExist == true);
}
+TEST_CASE("Test TableSession builder with nodeUrls", "[SessionBuilderInit]") {
+ CaseReporter cr("TableSessionInitWithNodeUrls");
+
+ std::vector<std::string> nodeUrls = {"127.0.0.1:6667"};
+ auto builder = std::unique_ptr<TableSessionBuilder>(new
TableSessionBuilder());
+ std::shared_ptr<TableSession> session =
+ std::shared_ptr<TableSession>(
+ builder
+ ->username("root")
+ ->password("root")
+ ->nodeUrls(nodeUrls)
+ ->build()
+ );
+ session->open();
+
+ session->executeNonQueryStatement("DROP DATABASE IF EXISTS db1");
+ session->executeNonQueryStatement("CREATE DATABASE db1");
+ session->executeNonQueryStatement("DROP DATABASE IF EXISTS db2");
+ session->executeNonQueryStatement("CREATE DATABASE db2");
+
+ session->close();
+}
+
TEST_CASE("Test insertRelationalTablet", "[testInsertRelationalTablet]") {
CaseReporter cr("testInsertRelationalTablet");
session->executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS db1");