This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 516437e370a [fix][websocket] Fix webSocketPingDurationSeconds config
(#19256)
516437e370a is described below
commit 516437e370a711d48fe1d444a0c47e64e7cf2f4b
Author: Zixuan Liu <[email protected]>
AuthorDate: Fri Jan 20 10:16:40 2023 +0800
[fix][websocket] Fix webSocketPingDurationSeconds config (#19256)
Signed-off-by: Zixuan Liu <[email protected]>
---
.../apache/pulsar/broker/ServiceConfiguration.java | 6 ++++++
.../pulsar/proxy/server/ProxyConfiguration.java | 6 ++++++
.../pulsar/websocket/AbstractWebSocketHandler.java | 22 +++++++++-------------
.../apache/pulsar/websocket/WebSocketService.java | 4 ----
4 files changed, 21 insertions(+), 17 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 2789860b3a2..4a9a6e47bf3 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2613,6 +2613,12 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private int webSocketSessionIdleTimeoutMillis = 300000;
+ @FieldContext(
+ category = CATEGORY_WEBSOCKET,
+ doc = "Interval of time to sending the ping to keep alive in
WebSocket proxy. "
+ + "This value greater than 0 means enabled")
+ private int webSocketPingDurationSeconds = -1;
+
@FieldContext(
category = CATEGORY_WEBSOCKET,
doc = "The maximum size of a text message during parsing in WebSocket
proxy."
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index fcee04435fe..a91b6e70f5b 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -811,6 +811,12 @@ public class ProxyConfiguration implements
PulsarConfiguration {
)
private boolean webSocketServiceEnabled = false;
+ @FieldContext(
+ category = CATEGORY_WEBSOCKET,
+ doc = "Interval of time to sending the ping to keep alive in
WebSocket proxy. "
+ + "This value greater than 0 means enabled")
+ private int webSocketPingDurationSeconds = -1;
+
@FieldContext(
category = CATEGORY_WEBSOCKET,
doc = "Name of the cluster to which this broker belongs to"
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
index e19f1557b15..3eb0a0dfcf8 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
@@ -56,7 +56,6 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ConsumerCommand;
-import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
@@ -192,18 +191,15 @@ public abstract class AbstractWebSocketHandler extends
WebSocketAdapter implemen
@Override
public void onWebSocketConnect(Session session) {
super.onWebSocketConnect(session);
- WebSocketProxyConfiguration webSocketProxyConfig =
service.getWebSocketProxyConfig();
- if (webSocketProxyConfig != null) {
- int webSocketPingDurationSeconds =
webSocketProxyConfig.getWebSocketPingDurationSeconds();
- if (webSocketPingDurationSeconds > 0) {
- pingFuture = service.getExecutor().scheduleAtFixedRate(() -> {
- try {
-
session.getRemote().sendPing(ByteBuffer.wrap("PING".getBytes(StandardCharsets.UTF_8)));
- } catch (IOException e) {
- log.warn("[{}] WebSocket send ping",
getSession().getRemoteAddress(), e);
- }
- }, webSocketPingDurationSeconds, webSocketPingDurationSeconds,
TimeUnit.SECONDS);
- }
+ int webSocketPingDurationSeconds =
service.getConfig().getWebSocketPingDurationSeconds();
+ if (webSocketPingDurationSeconds > 0) {
+ pingFuture = service.getExecutor().scheduleAtFixedRate(() -> {
+ try {
+
session.getRemote().sendPing(ByteBuffer.wrap("PING".getBytes(StandardCharsets.UTF_8)));
+ } catch (IOException e) {
+ log.warn("[{}] WebSocket send ping",
getSession().getRemoteAddress(), e);
+ }
+ }, webSocketPingDurationSeconds, webSocketPingDurationSeconds,
TimeUnit.SECONDS);
}
log.info("[{}] New WebSocket session on topic {}",
session.getRemoteAddress(), topic);
}
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index a0fe8099d01..9a8653029ce 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -69,9 +69,6 @@ public class WebSocketService implements Closeable {
private MetadataStoreExtended configMetadataStore;
private ServiceConfiguration config;
- @Getter
- private WebSocketProxyConfiguration webSocketProxyConfig;
-
@Getter
private Optional<CryptoKeyReader> cryptoKeyReader = Optional.empty();
@@ -83,7 +80,6 @@ public class WebSocketService implements Closeable {
public WebSocketService(WebSocketProxyConfiguration config) {
this(createClusterData(config),
PulsarConfigurationLoader.convertFrom(config));
- this.webSocketProxyConfig = config;
}
public WebSocketService(ClusterData localCluster, ServiceConfiguration
config) {