This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b406518fd26  Return 400 for invalid reader messageId query parameter 
(#25865)
b406518fd26 is described below

commit b406518fd2633c842d63d31ea16fe94e015ccd88
Author: Pratik Katti <[email protected]>
AuthorDate: Mon May 25 20:58:25 2026 +0530

     Return 400 for invalid reader messageId query parameter (#25865)
---
 .../org/apache/pulsar/websocket/ReaderHandler.java | 52 ++++++++----
 .../apache/pulsar/websocket/ReaderHandlerTest.java | 97 +++++++++++++++++++++-
 2 files changed, 132 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
index f3ddb8577de..1e28aa1c975 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
@@ -127,16 +127,28 @@ public class ReaderHandler extends 
AbstractWebSocketHandler {
             }
             allowConnect = true;
         } catch (Exception e) {
-            log.warn()
-                    .attr("remoteAddr", request.getRemoteAddr())
-                    .attr("remotePort", request.getRemotePort())
-                    .attr("reader", subscription)
-                    .attr("topic", topic)
-                    .exception(e)
-                    .log("Failed in creating reader on topic");
+            int errorCode = getErrorCode(e);
+            boolean isKnownError = errorCode != 
HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
+            if (isKnownError) {
+                log.warn()
+                        .attr("remoteAddr", request.getRemoteAddr())
+                        .attr("remotePort", request.getRemotePort())
+                        .attr("reader", subscription)
+                        .attr("topic", topic)
+                        .attr("message", e.getMessage())
+                        .log("Failed in creating reader on topic");
+            } else {
+                log.error()
+                        .attr("remoteAddr", request.getRemoteAddr())
+                        .attr("remotePort", request.getRemotePort())
+                        .attr("reader", subscription)
+                        .attr("topic", topic)
+                        .attr("message", e.getMessage())
+                        .exception(e)
+                        .log("Failed in creating reader on topic");
+            }
             try {
-                
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
-                        "Failed to create reader: " + e.getMessage());
+                response.sendError(errorCode, getErrorMessage(e));
             } catch (IOException e1) {
                 log.warn()
                         .attr("remoteAddr", request.getRemoteAddr())
@@ -385,13 +397,25 @@ public class ReaderHandler extends 
AbstractWebSocketHandler {
         return size;
     }
 
-    private MessageId getMessageId() throws IOException {
+    private MessageId getMessageId() {
         MessageId messageId = MessageId.latest;
-        if (isNotBlank(queryParams.get("messageId"))) {
-            if (queryParams.get("messageId").equals("earliest")) {
+        String messageIdParam = queryParams.get("messageId");
+        if (isNotBlank(messageIdParam)) {
+            if (messageIdParam.equals("earliest")) {
                 messageId = MessageId.earliest;
-            } else if (!queryParams.get("messageId").equals("latest")) {
-                messageId = 
MessageIdImpl.fromByteArray(Base64.getDecoder().decode(queryParams.get("messageId")));
+            } else if (!messageIdParam.equals("latest")) {
+                final byte[] decoded;
+                try {
+                    decoded = Base64.getDecoder().decode(messageIdParam);
+                } catch (IllegalArgumentException e) {
+                    throw new IllegalArgumentException("Invalid messageId 
base64 value", e);
+                }
+
+                try {
+                    messageId = MessageIdImpl.fromByteArray(decoded);
+                } catch (IOException | RuntimeException e) {
+                    throw new IllegalArgumentException("Invalid messageId 
value", e);
+                }
             }
         }
         return messageId;
diff --git 
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java
 
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java
index a79899ab6fa..9db1561ec5a 100644
--- 
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java
+++ 
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java
@@ -21,17 +21,22 @@ package org.apache.pulsar.websocket;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import java.io.IOException;
+import java.util.Base64;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -43,12 +48,98 @@ import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.MultiTopicsReaderImpl;
 import org.apache.pulsar.client.impl.ReaderImpl;
+import org.apache.pulsar.common.api.proto.MessageIdData;
 import org.eclipse.jetty.ee8.websocket.server.JettyServerUpgradeResponse;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class ReaderHandlerTest {
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testInvalidMessageIdBase64ReturnsBadRequest() throws 
IOException {
+        WebSocketService wss = mock(WebSocketService.class);
+        PulsarClient mockedClient = mock(PulsarClient.class);
+        when(wss.getPulsarClient()).thenReturn(mockedClient);
+        ReaderBuilder<byte[]> mockedReaderBuilder = mock(ReaderBuilder.class);
+        when(mockedClient.newReader()).thenReturn(mockedReaderBuilder);
+        when(mockedReaderBuilder.topic(any())).thenReturn(mockedReaderBuilder);
+        // Ensure the chain doesn't NPE after startMessageId() if parsing 
unexpectedly succeeds.
+        
when(mockedReaderBuilder.startMessageId(any())).thenReturn(mockedReaderBuilder);
+
+        Map<String, String[]> params = new HashMap<>();
+        params.put("messageId", new String[] { "invalidMessageId" });
+
+        HttpServletRequest request = mock(HttpServletRequest.class);
+        
when(request.getRequestURI()).thenReturn("/ws/v2/reader/persistent/my-property/my-ns/my-topic");
+        when(request.getParameterMap()).thenReturn(params);
+
+        JettyServerUpgradeResponse servletUpgradeResponse = 
mock(JettyServerUpgradeResponse.class);
+        new ReaderHandler(wss, request, servletUpgradeResponse);
+
+        verify(servletUpgradeResponse, times(1))
+                .sendError(eq(HttpServletResponse.SC_BAD_REQUEST), 
anyString());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testInvalidMessageIdBytesReturnsBadRequest() throws 
IOException {
+        WebSocketService wss = mock(WebSocketService.class);
+        PulsarClient mockedClient = mock(PulsarClient.class);
+        when(wss.getPulsarClient()).thenReturn(mockedClient);
+        ReaderBuilder<byte[]> mockedReaderBuilder = mock(ReaderBuilder.class);
+        when(mockedClient.newReader()).thenReturn(mockedReaderBuilder);
+        when(mockedReaderBuilder.topic(any())).thenReturn(mockedReaderBuilder);
+        // Ensure the chain doesn't NPE after startMessageId() if parsing 
unexpectedly succeeds.
+        
when(mockedReaderBuilder.startMessageId(any())).thenReturn(mockedReaderBuilder);
+
+        // "AQID" is valid Base64, but it doesn't decode into a valid Pulsar 
MessageId structure.
+        Map<String, String[]> params = new HashMap<>();
+        params.put("messageId", new String[] { "AQID" });
+
+        HttpServletRequest request = mock(HttpServletRequest.class);
+        
when(request.getRequestURI()).thenReturn("/ws/v2/reader/persistent/my-property/my-ns/my-topic");
+        when(request.getParameterMap()).thenReturn(params);
+
+        JettyServerUpgradeResponse servletUpgradeResponse = 
mock(JettyServerUpgradeResponse.class);
+        new ReaderHandler(wss, request, servletUpgradeResponse);
+
+        verify(servletUpgradeResponse, times(1))
+                .sendError(eq(HttpServletResponse.SC_BAD_REQUEST), 
anyString());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testInvalidMessageIdRuntimeParseFailureReturnsBadRequest() 
throws IOException {
+        WebSocketService wss = mock(WebSocketService.class);
+        PulsarClient mockedClient = mock(PulsarClient.class);
+        when(wss.getPulsarClient()).thenReturn(mockedClient);
+        ReaderBuilder<byte[]> mockedReaderBuilder = mock(ReaderBuilder.class);
+        when(mockedClient.newReader()).thenReturn(mockedReaderBuilder);
+        when(mockedReaderBuilder.topic(any())).thenReturn(mockedReaderBuilder);
+        // Ensure the chain doesn't NPE after startMessageId() if parsing 
unexpectedly succeeds.
+        
when(mockedReaderBuilder.startMessageId(any())).thenReturn(mockedReaderBuilder);
+
+        MessageIdData invalidBatchMessageId = new MessageIdData()
+                .setLedgerId(1)
+                .setEntryId(2)
+                .setBatchIndex(0)
+                .setBatchSize(-1);
+        Map<String, String[]> params = new HashMap<>();
+        params.put("messageId", new String[] {
+                
Base64.getEncoder().encodeToString(invalidBatchMessageId.toByteArray()) });
+
+        HttpServletRequest request = mock(HttpServletRequest.class);
+        
when(request.getRequestURI()).thenReturn("/ws/v2/reader/persistent/my-property/my-ns/my-topic");
+        when(request.getParameterMap()).thenReturn(params);
+
+        JettyServerUpgradeResponse servletUpgradeResponse = 
mock(JettyServerUpgradeResponse.class);
+        new ReaderHandler(wss, request, servletUpgradeResponse);
+
+        verify(servletUpgradeResponse, times(1))
+                .sendError(eq(HttpServletResponse.SC_BAD_REQUEST), 
anyString());
+    }
+
     @Test
     @SuppressWarnings("unchecked")
     public void testCreateReaderImp() throws IOException {
@@ -68,7 +159,7 @@ public class ReaderHandlerTest {
         when(consumerImp.getSubscription()).thenReturn(subName);
         when(mockedReader.getConsumer()).thenReturn(consumerImp);
         HttpServletRequest request = mock(HttpServletRequest.class);
-        
when(request.getRequestURI()).thenReturn("/ws/v2/producer/persistent/my-property/my-ns/my-topic");
+        
when(request.getRequestURI()).thenReturn("/ws/v2/reader/persistent/my-property/my-ns/my-topic");
         // create reader handler
         JettyServerUpgradeResponse servletUpgradeResponse = 
mock(JettyServerUpgradeResponse.class);
         ReaderHandler readerHandler = new ReaderHandler(wss, request, 
servletUpgradeResponse);
@@ -97,7 +188,7 @@ public class ReaderHandlerTest {
         when(consumerImp.getSubscription()).thenReturn(subName);
         when(mockedReader.getMultiTopicsConsumer()).thenReturn(consumerImp);
         HttpServletRequest request = mock(HttpServletRequest.class);
-        
when(request.getRequestURI()).thenReturn("/ws/v2/producer/persistent/my-property/my-ns/my-topic");
+        
when(request.getRequestURI()).thenReturn("/ws/v2/reader/persistent/my-property/my-ns/my-topic");
         // create reader handler
         JettyServerUpgradeResponse servletUpgradeResponse = 
mock(JettyServerUpgradeResponse.class);
         ReaderHandler readerHandler = new ReaderHandler(wss, request, 
servletUpgradeResponse);
@@ -122,7 +213,7 @@ public class ReaderHandlerTest {
         IllegalReader illegalReader = new IllegalReader();
         when(mockedReaderBuilder.create()).thenReturn(illegalReader);
         HttpServletRequest request = mock(HttpServletRequest.class);
-        
when(request.getRequestURI()).thenReturn("/ws/v2/producer/persistent/my-property/my-ns/my-topic");
+        
when(request.getRequestURI()).thenReturn("/ws/v2/reader/persistent/my-property/my-ns/my-topic");
         // create reader handler
         JettyServerUpgradeResponse servletUpgradeResponse = 
spy(JettyServerUpgradeResponse.class);
         new ReaderHandler(wss, request, servletUpgradeResponse);

Reply via email to