This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new affd047  Fixes NPE - ``ReplicatedSubscriptionsController`` send marker 
message when enable deduplicated. (#14017)
affd047 is described below

commit affd0476866abd6a50ae41bc4ee8a661f829d68d
Author: Qiang Zhao <74767115+mattisonc...@users.noreply.github.com>
AuthorDate: Sat Jan 29 01:43:53 2022 +0800

    Fixes NPE - ``ReplicatedSubscriptionsController`` send marker message when 
enable deduplicated. (#14017)
    
    * Fixes marker message cause NPE.
    
    * Fixes marker message cause NPE.
---
 .../service/persistent/MessageDeduplication.java   |  7 ++-
 .../broker/BrokerMessageDeduplicationTest.java     | 73 ++++++++++++++++++++++
 2 files changed, 77 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index e2436bb..9913ddf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -79,7 +79,8 @@ public class MessageDeduplication {
         Failed,
     }
 
-    enum MessageDupStatus {
+    @VisibleForTesting
+    public enum MessageDupStatus {
         // whether a message is a definitely a duplicate or not cannot be 
determined at this time
         Unknown,
         // message is definitely NOT a duplicate
@@ -312,7 +313,7 @@ public class MessageDeduplication {
      * @return true if the message should be published or false if it was 
recognized as a duplicate
      */
     public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf 
headersAndPayload) {
-        if (!isEnabled()) {
+        if (!isEnabled() || publishContext.isMarkerMessage()) {
             return MessageDupStatus.NotDup;
         }
 
@@ -365,7 +366,7 @@ public class MessageDeduplication {
      * Call this method whenever a message is persisted to get the chance to 
trigger a snapshot.
      */
     public void recordMessagePersisted(PublishContext publishContext, 
PositionImpl position) {
-        if (!isEnabled()) {
+        if (!isEnabled() || publishContext.isMarkerMessage()) {
             return;
         }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java
new file mode 100644
index 0000000..f2d7a21
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.MessageDeduplication;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.testng.annotations.Test;
+
+public class BrokerMessageDeduplicationTest {
+
+    @Test
+    public void markerMessageNotDeduplicated() {
+        PulsarService pulsarService = mock(PulsarService.class);
+        ServiceConfiguration configuration = new ServiceConfiguration();
+        doReturn(configuration).when(pulsarService).getConfiguration();
+        MessageDeduplication deduplication = spy(new 
MessageDeduplication(pulsarService,
+                mock(PersistentTopic.class), mock(ManagedLedger.class)));
+        doReturn(true).when(deduplication).isEnabled();
+        Topic.PublishContext context = mock(Topic.PublishContext.class);
+        doReturn(true).when(context).isMarkerMessage();
+        MessageDeduplication.MessageDupStatus status = 
deduplication.isDuplicate(context, null);
+        assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup);
+    }
+
+    @Test
+    public void markerMessageNotRecordPersistent() {
+        PulsarService pulsarService = mock(PulsarService.class);
+        ServiceConfiguration configuration = new ServiceConfiguration();
+        doReturn(configuration).when(pulsarService).getConfiguration();
+        MessageDeduplication deduplication = spy(new 
MessageDeduplication(pulsarService,
+                mock(PersistentTopic.class), mock(ManagedLedger.class)));
+        doReturn(true).when(deduplication).isEnabled();
+        Topic.PublishContext context = mock(Topic.PublishContext.class);
+         // marker message don't record message persisted.
+        doReturn(true).when(context).isMarkerMessage();
+        deduplication.recordMessagePersisted(context, null);
+
+        // if is not a marker message, we will get NPE. because context is 
mocked with null value fields.
+        doReturn(false).when(context).isMarkerMessage();
+        try {
+            deduplication.recordMessagePersisted(context, null);
+            fail();
+        } catch (Exception npe) {
+            assertTrue(npe instanceof NullPointerException);
+        }
+    }
+
+
+}

Reply via email to