This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push: new affd047 Fixes NPE - ``ReplicatedSubscriptionsController`` send marker message when enable deduplicated. (#14017) affd047 is described below commit affd0476866abd6a50ae41bc4ee8a661f829d68d Author: Qiang Zhao <74767115+mattisonc...@users.noreply.github.com> AuthorDate: Sat Jan 29 01:43:53 2022 +0800 Fixes NPE - ``ReplicatedSubscriptionsController`` send marker message when enable deduplicated. (#14017) * Fixes marker message cause NPE. * Fixes marker message cause NPE. --- .../service/persistent/MessageDeduplication.java | 7 ++- .../broker/BrokerMessageDeduplicationTest.java | 73 ++++++++++++++++++++++ 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index e2436bb..9913ddf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -79,7 +79,8 @@ public class MessageDeduplication { Failed, } - enum MessageDupStatus { + @VisibleForTesting + public enum MessageDupStatus { // whether a message is a definitely a duplicate or not cannot be determined at this time Unknown, // message is definitely NOT a duplicate @@ -312,7 +313,7 @@ public class MessageDeduplication { * @return true if the message should be published or false if it was recognized as a duplicate */ public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf headersAndPayload) { - if (!isEnabled()) { + if (!isEnabled() || publishContext.isMarkerMessage()) { return MessageDupStatus.NotDup; } @@ -365,7 +366,7 @@ public class MessageDeduplication { * Call this method whenever a message is persisted to get the chance to trigger a snapshot. */ public void recordMessagePersisted(PublishContext publishContext, PositionImpl position) { - if (!isEnabled()) { + if (!isEnabled() || publishContext.isMarkerMessage()) { return; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java new file mode 100644 index 0000000..f2d7a21 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.MessageDeduplication; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.testng.annotations.Test; + +public class BrokerMessageDeduplicationTest { + + @Test + public void markerMessageNotDeduplicated() { + PulsarService pulsarService = mock(PulsarService.class); + ServiceConfiguration configuration = new ServiceConfiguration(); + doReturn(configuration).when(pulsarService).getConfiguration(); + MessageDeduplication deduplication = spy(new MessageDeduplication(pulsarService, + mock(PersistentTopic.class), mock(ManagedLedger.class))); + doReturn(true).when(deduplication).isEnabled(); + Topic.PublishContext context = mock(Topic.PublishContext.class); + doReturn(true).when(context).isMarkerMessage(); + MessageDeduplication.MessageDupStatus status = deduplication.isDuplicate(context, null); + assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup); + } + + @Test + public void markerMessageNotRecordPersistent() { + PulsarService pulsarService = mock(PulsarService.class); + ServiceConfiguration configuration = new ServiceConfiguration(); + doReturn(configuration).when(pulsarService).getConfiguration(); + MessageDeduplication deduplication = spy(new MessageDeduplication(pulsarService, + mock(PersistentTopic.class), mock(ManagedLedger.class))); + doReturn(true).when(deduplication).isEnabled(); + Topic.PublishContext context = mock(Topic.PublishContext.class); + // marker message don't record message persisted. + doReturn(true).when(context).isMarkerMessage(); + deduplication.recordMessagePersisted(context, null); + + // if is not a marker message, we will get NPE. because context is mocked with null value fields. + doReturn(false).when(context).isMarkerMessage(); + try { + deduplication.recordMessagePersisted(context, null); + fail(); + } catch (Exception npe) { + assertTrue(npe instanceof NullPointerException); + } + } + + +}