This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iotdb-client-csharp.git
The following commit(s) were added to refs/heads/main by this push:
new 54545a5 Support Client TLS (#43)
54545a5 is described below
commit 54545a5baaedda645524423aba5e9b74c1a3f9d1
Author: Haonan <[email protected]>
AuthorDate: Tue Jan 20 11:36:43 2026 +0800
Support Client TLS (#43)
---
src/Apache.IoTDB/SessionPool.Builder.cs | 21 ++++++++++++---
src/Apache.IoTDB/SessionPool.cs | 39 ++++++++++++++++------------
src/Apache.IoTDB/TableSessionPool.Builder.cs | 18 +++++++++++--
3 files changed, 57 insertions(+), 21 deletions(-)
diff --git a/src/Apache.IoTDB/SessionPool.Builder.cs
b/src/Apache.IoTDB/SessionPool.Builder.cs
index f943d81..9de2874 100644
--- a/src/Apache.IoTDB/SessionPool.Builder.cs
+++ b/src/Apache.IoTDB/SessionPool.Builder.cs
@@ -17,7 +17,6 @@
* under the License.
*/
-using System;
using System.Collections.Generic;
namespace Apache.IoTDB;
@@ -35,6 +34,8 @@ public partial class SessionPool
private int _poolSize = 8;
private bool _enableRpcCompression = false;
private int _connectionTimeoutInMs = 500;
+ private bool _useSsl = false;
+ private string _certificatePath = null;
private string _sqlDialect = IoTDBConstant.TREE_SQL_DIALECT;
private string _database = "";
private List<string> _nodeUrls = new List<string>();
@@ -93,6 +94,18 @@ public partial class SessionPool
return this;
}
+ public Builder SetUseSsl(bool useSsl)
+ {
+ _useSsl = useSsl;
+ return this;
+ }
+
+ public Builder SetCertificatePath(string certificatePath)
+ {
+ _certificatePath = certificatePath;
+ return this;
+ }
+
public Builder SetNodeUrl(List<string> nodeUrls)
{
_nodeUrls = nodeUrls;
@@ -122,6 +135,8 @@ public partial class SessionPool
_poolSize = 8;
_enableRpcCompression = false;
_connectionTimeoutInMs = 500;
+ _useSsl = false;
+ _certificatePath = null;
_sqlDialect = IoTDBConstant.TREE_SQL_DIALECT;
_database = "";
}
@@ -131,9 +146,9 @@ public partial class SessionPool
// if nodeUrls is not empty, use nodeUrls to create session pool
if (_nodeUrls.Count > 0)
{
- return new SessionPool(_nodeUrls, _username, _password,
_fetchSize, _zoneId, _poolSize, _enableRpcCompression, _connectionTimeoutInMs,
_sqlDialect, _database);
+ return new SessionPool(_nodeUrls, _username, _password,
_fetchSize, _zoneId, _poolSize, _enableRpcCompression, _connectionTimeoutInMs,
_useSsl, _certificatePath, _sqlDialect, _database);
}
- return new SessionPool(_host, _port, _username, _password,
_fetchSize, _zoneId, _poolSize, _enableRpcCompression, _connectionTimeoutInMs,
_sqlDialect, _database);
+ return new SessionPool(_host, _port, _username, _password,
_fetchSize, _zoneId, _poolSize, _enableRpcCompression, _connectionTimeoutInMs,
_useSsl, _certificatePath, _sqlDialect, _database);
}
}
}
diff --git a/src/Apache.IoTDB/SessionPool.cs b/src/Apache.IoTDB/SessionPool.cs
index 135199b..fc2eca0 100644
--- a/src/Apache.IoTDB/SessionPool.cs
+++ b/src/Apache.IoTDB/SessionPool.cs
@@ -19,13 +19,12 @@
using System;
using System.Collections.Generic;
+using System.IO;
using System.Linq;
-using System.Net.Sockets;
-using System.Numerics;
using System.Threading;
using System.Threading.Tasks;
+using System.Security.Cryptography.X509Certificates;
using Apache.IoTDB.DataStructure;
-using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Thrift;
using Thrift.Protocol;
@@ -47,6 +46,8 @@ namespace Apache.IoTDB
private readonly List<TEndPoint> _endPoints = new();
private readonly string _host;
private readonly int _port;
+ private readonly bool _useSsl;
+ private readonly string _certificatePath;
private readonly int _fetchSize;
/// <summary>
/// _timeout is the amount of time a Session will wait for a send
operation to complete successfully.
@@ -86,10 +87,10 @@ namespace Apache.IoTDB
{
}
public SessionPool(string host, int port, string username, string
password, int fetchSize, string zoneId, int poolSize, bool
enableRpcCompression, int timeout)
- : this(host, port, username, password, fetchSize,
zoneId, poolSize, enableRpcCompression, timeout,
IoTDBConstant.TREE_SQL_DIALECT, "")
+ : this(host, port, username, password, fetchSize,
zoneId, poolSize, enableRpcCompression, timeout, false, null,
IoTDBConstant.TREE_SQL_DIALECT, "")
{
}
- protected internal SessionPool(string host, int port, string username,
string password, int fetchSize, string zoneId, int poolSize, bool
enableRpcCompression, int timeout, string sqlDialect, string database)
+ protected internal SessionPool(string host, int port, string username,
string password, int fetchSize, string zoneId, int poolSize, bool
enableRpcCompression, int timeout, bool useSsl, string certificatePath, string
sqlDialect, string database)
{
_host = host;
_port = port;
@@ -101,6 +102,8 @@ namespace Apache.IoTDB
_poolSize = poolSize;
_enableRpcCompression = enableRpcCompression;
_timeout = timeout;
+ _useSsl = useSsl;
+ _certificatePath = certificatePath;
_sqlDialect = sqlDialect;
_database = database;
}
@@ -126,11 +129,11 @@ namespace Apache.IoTDB
{
}
public SessionPool(List<string> nodeUrls, string username, string
password, int fetchSize, string zoneId, int poolSize, bool
enableRpcCompression, int timeout)
- : this(nodeUrls, username, password, fetchSize,
zoneId, poolSize, enableRpcCompression, timeout,
IoTDBConstant.TREE_SQL_DIALECT, "")
+ : this(nodeUrls, username, password, fetchSize,
zoneId, poolSize, enableRpcCompression, timeout, false, null,
IoTDBConstant.TREE_SQL_DIALECT, "")
{
}
- protected internal SessionPool(List<string> nodeUrls, string username,
string password, int fetchSize, string zoneId, int poolSize, bool
enableRpcCompression, int timeout, string sqlDialect, string database)
+ protected internal SessionPool(List<string> nodeUrls, string username,
string password, int fetchSize, string zoneId, int poolSize, bool
enableRpcCompression, int timeout, bool useSsl, string certificatePath, string
sqlDialect, string database)
{
if (nodeUrls.Count == 0)
{
@@ -146,6 +149,8 @@ namespace Apache.IoTDB
_poolSize = poolSize;
_enableRpcCompression = enableRpcCompression;
_timeout = timeout;
+ _useSsl = useSsl;
+ _certificatePath = certificatePath;
_sqlDialect = sqlDialect;
_database = database;
}
@@ -241,7 +246,7 @@ namespace Apache.IoTDB
{
try
{
- _clients.Add(await CreateAndOpen(_host, _port,
_enableRpcCompression, _timeout, _sqlDialect, _database, cancellationToken));
+ _clients.Add(await CreateAndOpen(_host, _port,
_enableRpcCompression, _timeout, _useSsl, _certificatePath, _sqlDialect,
_database, cancellationToken));
}
catch (Exception e)
{
@@ -264,7 +269,7 @@ namespace Apache.IoTDB
var endPoint = _endPoints[endPointIndex];
try
{
- var client = await CreateAndOpen(endPoint.Ip,
endPoint.Port, _enableRpcCompression, _timeout, _sqlDialect, _database,
cancellationToken);
+ var client = await CreateAndOpen(endPoint.Ip,
endPoint.Port, _enableRpcCompression, _timeout, _useSsl, _certificatePath,
_sqlDialect, _database, cancellationToken);
_clients.Add(client);
isConnected = true;
startIndex = (endPointIndex + 1) %
_endPoints.Count;
@@ -303,7 +308,7 @@ namespace Apache.IoTDB
{
try
{
- var client = await CreateAndOpen(_host, _port,
_enableRpcCompression, _timeout, _sqlDialect, _database, cancellationToken);
+ var client = await CreateAndOpen(_host, _port,
_enableRpcCompression, _timeout, _useSsl, _certificatePath, _sqlDialect,
_database, cancellationToken);
return client;
}
catch (Exception e)
@@ -330,7 +335,7 @@ namespace Apache.IoTDB
int j = (startIndex + i) % _endPoints.Count;
try
{
- var client = await CreateAndOpen(_endPoints[j].Ip,
_endPoints[j].Port, _enableRpcCompression, _timeout, _sqlDialect, _database,
cancellationToken);
+ var client = await CreateAndOpen(_endPoints[j].Ip,
_endPoints[j].Port, _enableRpcCompression, _timeout, _useSsl, _certificatePath,
_sqlDialect, _database, cancellationToken);
return client;
}
catch (Exception e)
@@ -423,12 +428,14 @@ namespace Apache.IoTDB
}
}
- private async Task<Client> CreateAndOpen(string host, int port, bool
enableRpcCompression, int timeout, string sqlDialect, string database,
CancellationToken cancellationToken = default)
+ private async Task<Client> CreateAndOpen(string host, int port, bool
enableRpcCompression, int timeout, bool useSsl, string cert, string sqlDialect,
string database, CancellationToken cancellationToken = default)
{
- var tcpClient = new TcpClient(host, port);
- tcpClient.SendTimeout = timeout;
- tcpClient.ReceiveTimeout = timeout;
- var transport = new TFramedTransport(new
TSocketTransport(tcpClient, null));
+
+ TTransport socket = useSsl ?
+ new TTlsSocketTransport(host, port, null, timeout, new
X509Certificate2(File.ReadAllBytes(cert))) :
+ new TSocketTransport(host, port, null, timeout);
+
+ var transport = new TFramedTransport(socket);
if (!transport.IsOpen)
{
diff --git a/src/Apache.IoTDB/TableSessionPool.Builder.cs
b/src/Apache.IoTDB/TableSessionPool.Builder.cs
index 07387b5..10e24c8 100644
--- a/src/Apache.IoTDB/TableSessionPool.Builder.cs
+++ b/src/Apache.IoTDB/TableSessionPool.Builder.cs
@@ -37,6 +37,8 @@ public partial class TableSessionPool
private int _poolSize = 8;
private bool _enableRpcCompression = false;
private int _connectionTimeoutInMs = 500;
+ private bool _useSsl = false;
+ private string _certificatePath = null;
private string _sqlDialect = IoTDBConstant.TREE_SQL_DIALECT;
private string _database = "";
private List<string> _nodeUrls = new List<string>();
@@ -95,6 +97,18 @@ public partial class TableSessionPool
return this;
}
+ public Builder SetUseSsl(bool useSsl)
+ {
+ _useSsl = useSsl;
+ return this;
+ }
+
+ public Builder SetCertificatePath(string certificatePath)
+ {
+ _certificatePath = certificatePath;
+ return this;
+ }
+
public Builder SetNodeUrls(List<string> nodeUrls)
{
_nodeUrls = nodeUrls;
@@ -134,11 +148,11 @@ public partial class TableSessionPool
// if nodeUrls is not empty, use nodeUrls to create session pool
if (_nodeUrls.Count > 0)
{
- sessionPool = new SessionPool(_nodeUrls, _username, _password,
_fetchSize, _zoneId, _poolSize, _enableRpcCompression, _connectionTimeoutInMs,
_sqlDialect, _database);
+ sessionPool = new SessionPool(_nodeUrls, _username, _password,
_fetchSize, _zoneId, _poolSize, _enableRpcCompression, _connectionTimeoutInMs,
_useSsl, _certificatePath, _sqlDialect, _database);
}
else
{
- sessionPool = new SessionPool(_host, _port, _username,
_password, _fetchSize, _zoneId, _poolSize, _enableRpcCompression,
_connectionTimeoutInMs, _sqlDialect, _database);
+ sessionPool = new SessionPool(_host, _port, _username,
_password, _fetchSize, _zoneId, _poolSize, _enableRpcCompression,
_connectionTimeoutInMs, _useSsl, _certificatePath, _sqlDialect, _database);
}
return new TableSessionPool(sessionPool);
}