This is an automated email from the ASF dual-hosted git repository.
nodece pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 87ce8f90 [Issue 1094][doc] Enhance ConnectionTimeout/KeepAliveInterval
comments (#1488)
87ce8f90 is described below
commit 87ce8f90d0e2df9402133ebc52560638bb09778d
Author: zhou zhuohan <[email protected]>
AuthorDate: Tue May 19 19:27:08 2026 +0800
[Issue 1094][doc] Enhance ConnectionTimeout/KeepAliveInterval comments
(#1488)
---
pulsar/client.go | 17 ++++++++++++++++-
pulsar/client_impl_test.go | 30 ++++++++++++++++++++++++++++++
pulsar/internal/helper.go | 10 ++++++++++
3 files changed, 56 insertions(+), 1 deletion(-)
diff --git a/pulsar/client.go b/pulsar/client.go
index ec3438b3..ac0d14d2 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -90,7 +90,17 @@ type ClientOptions struct {
// This parameter is required
URL string
- // Timeout for the establishment of a TCP connection (default: 5
seconds)
+ // Timeout for the establishment of a TCP connection (default: 0).
+ // This parameter only controls the TCP connection establishment phase
(including TLS handshake
+ // if TLS is enabled). It does NOT affect the timeout for connection
disruption detection after
+ // the connection is established. Once connected, connection liveness
is monitored by the
+ // Ping/Pong heartbeat mechanism controlled by KeepAliveInterval — if
no data is received
+ // within 2 × KeepAliveInterval, the connection is considered stale and
will be closed.
+ //
+ // 0 means no application-level timeout, the actual TCP connection
timeout
+ // will fall back to the OS kernel's TCP settings (on Linux, controlled
by
+ // net.ipv4.tcp_syn_retries, which defaults to 6 retries, ~127s in
total).
+ // If your application is sensitive to service disruption, set this
explicitly (e.g., 10s or 15s).
ConnectionTimeout time.Duration
// Set the operation timeout (default: 30 seconds)
@@ -99,6 +109,11 @@ type ClientOptions struct {
OperationTimeout time.Duration
// Configure the ping send and check interval, default to 30 seconds.
+ //
+ // The client sends PING every KeepAliveInterval and considers the
connection stale if no data
+ // is received within 2 × KeepAliveInterval, then closes it and
triggers automatic reconnection.
+ // The timeout for the reconnection TCP dial is controlled by
ConnectionTimeout.
+ // To speed up reconnection, reduce this value (e.g., 10s or 15s).
KeepAliveInterval time.Duration
// Configure the authentication provider. (default: no authentication)
diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go
index 30754044..444d21b3 100644
--- a/pulsar/client_impl_test.go
+++ b/pulsar/client_impl_test.go
@@ -1238,3 +1238,33 @@ func TestMultipleCloseClient(t *testing.T) {
client.Close()
client.Close()
}
+
+func TestDefaultConnectionTimeout(t *testing.T) {
+ // Verify that the default ConnectionTimeout is 0 (no application-level
timeout)
+ // when the user does not explicitly set it.
+ cli, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.Nil(t, err)
+ defer cli.Close()
+
+ pool := cli.(*client).cnxPool
+ connectionTimeout := internal.GetConnectionTimeout(&pool)
+ assert.Equal(t, time.Duration(0), connectionTimeout,
+ "Default ConnectionTimeout should be 0 (no application-level
timeout)")
+}
+
+func TestDefaultKeepAliveInterval(t *testing.T) {
+ // Verify that the default KeepAliveInterval is 30s
+ // when the user does not explicitly set it.
+ cli, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.Nil(t, err)
+ defer cli.Close()
+
+ pool := cli.(*client).cnxPool
+ keepAliveInterval := internal.GetKeepAliveInterval(&pool)
+ assert.Equal(t, 30*time.Second, keepAliveInterval,
+ "Default KeepAliveInterval should be 30s")
+}
diff --git a/pulsar/internal/helper.go b/pulsar/internal/helper.go
index 3bca1ee0..ca13f229 100644
--- a/pulsar/internal/helper.go
+++ b/pulsar/internal/helper.go
@@ -31,3 +31,13 @@ func GetConnectionsCount(p *ConnectionPool) int {
defer pool.Unlock()
return len(pool.connections)
}
+
+func GetConnectionTimeout(p *ConnectionPool) time.Duration {
+ pool := (*p).(*connectionPool)
+ return pool.connectionTimeout
+}
+
+func GetKeepAliveInterval(p *ConnectionPool) time.Duration {
+ pool := (*p).(*connectionPool)
+ return pool.keepAliveInterval
+}