This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 8e95f43 Improve error handling logic for effectively once (#5271) 8e95f43 is described below commit 8e95f438acce495688f6e99f1e3034da572eab07 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Fri Oct 4 20:43:02 2019 -0700 Improve error handling logic for effectively once (#5271) * Bug in Message Deduplication that may cause incorrect behavior * add tests * fix error message * fix client backoff * fix tests * cleaning up * Fix handling of BK write failures for message dedup * tests and clean up * refactoring code * fixing bugs * addressing comments * add missing license header --- .../service/persistent/MessageDeduplication.java | 34 ++- .../broker/service/persistent/PersistentTopic.java | 125 ++++++--- .../service/persistent/MessageDuplicationTest.java | 180 ++++++++++++- .../client/api/ClientDeduplicationFailureTest.java | 278 +++++++++++++++++++++ .../pulsar/zookeeper/LocalBookkeeperEnsemble.java | 37 ++- 5 files changed, 602 insertions(+), 52 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 1a685b2..e898b4a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -18,17 +18,8 @@ */ package org.apache.pulsar.broker.service.persistent; -import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - import com.google.common.annotations.VisibleForTesting; +import io.netty.buffer.ByteBuf; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback; @@ -41,14 +32,22 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.service.Topic.PublishContext; -import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.buffer.ByteBuf; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; /** * Class that contains all the logic to control and perform the deduplication on the broker side @@ -347,6 +346,17 @@ public class MessageDeduplication { } } + public void resetHighestSequenceIdPushed() { + if (!isEnabled()) { + return; + } + + highestSequencedPushed.clear(); + for (String producer : highestSequencedPersisted.keys()) { + highestSequencedPushed.put(producer, highestSequencedPersisted.get(producer)); + } + } + private void takeSnapshot(PositionImpl position) { if (log.isDebugEnabled()) { log.debug("[{}] Taking snapshot of sequence ids map", topic.getName()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 12792e8..dbc5bf2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -19,22 +19,12 @@ package org.apache.pulsar.broker.service.persistent; import com.carrotsearch.hppc.ObjectObjectHashMap; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; import io.netty.util.concurrent.FastThreadLocal; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; @@ -117,6 +107,21 @@ import org.apache.pulsar.utils.StatsOutputStream; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.function.BiFunction; + import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.commons.lang3.StringUtils.isBlank; @@ -145,7 +150,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal private Optional<SubscribeRateLimiter> subscribeRateLimiter = Optional.empty(); public static final int MESSAGE_RATE_BACKOFF_MS = 1000; - private final MessageDeduplication messageDeduplication; + protected final MessageDeduplication messageDeduplication; private static final long COMPACTION_NEVER_RUN = -0xfebecffeL; private CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN); @@ -163,6 +168,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } }; + private final AtomicLong pendingWriteOps = new AtomicLong(0); + private static class TopicStatsHelper { public double averageMsgSize; public double aggMsgRateIn; @@ -238,6 +245,17 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal checkReplicatedSubscriptionControllerState(); } + // for testing purposes + @VisibleForTesting + PersistentTopic(String topic, BrokerService brokerService, ManagedLedger ledger, MessageDeduplication messageDeduplication) { + super(topic, brokerService); + this.ledger = ledger; + this.messageDeduplication = messageDeduplication; + this.subscriptions = new ConcurrentOpenHashMap<>(16, 1); + this.replicators = new ConcurrentOpenHashMap<>(16, 1); + this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient()); + } + private void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) { synchronized (dispatchRateLimiter) { // dispatch rate limiter for topic @@ -272,17 +290,41 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal @Override public void publishMessage(ByteBuf headersAndPayload, PublishContext publishContext) { + pendingWriteOps.incrementAndGet(); + if (isFenced) { + publishContext.completed(new TopicFencedException("fenced"), -1, -1); + decrementPendingWriteOpsAndCheck(); + return; + } + MessageDeduplication.MessageDupStatus status = messageDeduplication.isDuplicate(publishContext, headersAndPayload); - switch (status){ + switch (status) { case NotDup: ledger.asyncAddEntry(headersAndPayload, this, publishContext); break; case Dup: // Immediately acknowledge duplicated message publishContext.completed(null, -1, -1); + decrementPendingWriteOpsAndCheck(); break; default: publishContext.completed(new MessageDeduplication.MessageDupUnknownException(), -1, -1); + decrementPendingWriteOpsAndCheck(); + + } + } + + private void decrementPendingWriteOpsAndCheck() { + long pending = pendingWriteOps.decrementAndGet(); + if (pending == 0 && isFenced) { + synchronized (this) { + if (isFenced) { + messageDeduplication.resetHighestSequenceIdPushed(); + log.info("[{}] Un-fencing topic...", topic); + isFenced = false; + } + + } } } @@ -294,35 +336,50 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal // Message has been successfully persisted messageDeduplication.recordMessagePersisted(publishContext, position); publishContext.completed(null, position.getLedgerId(), position.getEntryId()); + + decrementPendingWriteOpsAndCheck(); } @Override - public void addFailed(ManagedLedgerException exception, Object ctx) { - PublishContext callback = (PublishContext) ctx; - - if (exception instanceof ManagedLedgerAlreadyClosedException) { - if (log.isDebugEnabled()) { - log.debug("[{}] Failed to persist msg in store: {}", topic, exception.getMessage()); - } - - callback.completed(new TopicClosedException(exception), -1, -1); - return; - - } else { - log.warn("[{}] Failed to persist msg in store: {}", topic, exception.getMessage()); - } + public synchronized void addFailed(ManagedLedgerException exception, Object ctx) { - if (exception instanceof ManagedLedgerTerminatedException) { - // Signal the producer that this topic is no longer available - callback.completed(new TopicTerminatedException(exception), -1, -1); - } else { - // Use generic persistence exception - callback.completed(new PersistenceException(exception), -1, -1); - } + // fence topic when failed to write a message to BK + isFenced = true; if (exception instanceof ManagedLedgerFencedException) { // If the managed ledger has been fenced, we cannot continue using it. We need to close and reopen close(); + } else { + + // close all producers + List<CompletableFuture<Void>> futures = Lists.newArrayList(); + producers.forEach(producer -> futures.add(producer.disconnect())); + FutureUtil.waitForAll(futures).handle((BiFunction<Void, Throwable, Void>) (aVoid, throwable) -> { + decrementPendingWriteOpsAndCheck(); + return null; + }); + + PublishContext callback = (PublishContext) ctx; + + if (exception instanceof ManagedLedgerAlreadyClosedException) { + if (log.isDebugEnabled()) { + log.debug("[{}] Failed to persist msg in store: {}", topic, exception.getMessage()); + } + + callback.completed(new TopicClosedException(exception), -1, -1); + return; + + } else { + log.warn("[{}] Failed to persist msg in store: {}", topic, exception.getMessage()); + } + + if (exception instanceof ManagedLedgerTerminatedException) { + // Signal the producer that this topic is no longer available + callback.completed(new TopicTerminatedException(exception), -1, -1); + } else { + // Use generic persistence exception + callback.completed(new PersistenceException(exception), -1, -1); + } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java index aa6e9d4..a29de11 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java @@ -21,17 +21,29 @@ package org.apache.pulsar.broker.service.persistent; import io.netty.buffer.ByteBuf; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.protocol.Commands; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.testng.annotations.Test; +import java.util.concurrent.ScheduledExecutorService; + import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -115,6 +127,170 @@ public class MessageDuplicationTest { assertEquals(lastSequenceIdPushed.longValue(), 5); } + @Test + public void testIsDuplicateWithFailure() { + + PulsarService pulsarService = mock(PulsarService.class); + ServiceConfiguration serviceConfiguration = new ServiceConfiguration(); + serviceConfiguration.setBrokerDeduplicationEntriesInterval(BROKER_DEDUPLICATION_ENTRIES_INTERVAL); + serviceConfiguration.setBrokerDeduplicationMaxNumberOfProducers(BROKER_DEDUPLICATION_MAX_NUMBER_PRODUCERS); + serviceConfiguration.setReplicatorPrefix(REPLICATOR_PREFIX); + + doReturn(serviceConfiguration).when(pulsarService).getConfiguration(); + + ManagedLedger managedLedger = mock(ManagedLedger.class); + MessageDeduplication messageDeduplication = spy(new MessageDeduplication(pulsarService, mock(PersistentTopic.class), managedLedger)); + doReturn(true).when(messageDeduplication).isEnabled(); + + + ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + Object[] args = invocationOnMock.getArguments(); + Runnable test = (Runnable) args[0]; + test.run(); + return null; + } + }).when(scheduledExecutorService).submit(any(Runnable.class)); + + BrokerService brokerService = mock(BrokerService.class); + doReturn(scheduledExecutorService).when(brokerService).executor(); + doReturn(pulsarService).when(brokerService).pulsar(); + + PersistentTopic persistentTopic = spy(new PersistentTopic("topic-1", brokerService, managedLedger, messageDeduplication)); + + String producerName1 = "producer1"; + ByteBuf byteBuf1 = getMessage(producerName1, 0); + Topic.PublishContext publishContext1 = getPublishContext(producerName1, 0); + + String producerName2 = "producer2"; + ByteBuf byteBuf2 = getMessage(producerName2, 1); + Topic.PublishContext publishContext2 = getPublishContext(producerName2, 1); + + persistentTopic.publishMessage(byteBuf1, publishContext1); + persistentTopic.addComplete(new PositionImpl(0, 1), publishContext1); + verify(managedLedger, times(1)).asyncAddEntry(any(ByteBuf.class), any(), any()); + Long lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1); + assertTrue(lastSequenceIdPushed != null); + assertEquals(lastSequenceIdPushed.longValue(), 0); + lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1); + assertTrue(lastSequenceIdPushed != null); + assertEquals(lastSequenceIdPushed.longValue(), 0); + + persistentTopic.publishMessage(byteBuf2, publishContext2); + persistentTopic.addComplete(new PositionImpl(0, 2), publishContext2); + verify(managedLedger, times(2)).asyncAddEntry(any(ByteBuf.class), any(), any()); + lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName2); + assertTrue(lastSequenceIdPushed != null); + assertEquals(lastSequenceIdPushed.longValue(), 1); + lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName2); + assertTrue(lastSequenceIdPushed != null); + assertEquals(lastSequenceIdPushed.longValue(), 1); + + byteBuf1 = getMessage(producerName1, 1); + publishContext1 = getPublishContext(producerName1, 1); + persistentTopic.publishMessage(byteBuf1, publishContext1); + persistentTopic.addComplete(new PositionImpl(0, 3), publishContext1); + verify(managedLedger, times(3)).asyncAddEntry(any(ByteBuf.class), any(), any()); + lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1); + assertTrue(lastSequenceIdPushed != null); + assertEquals(lastSequenceIdPushed.longValue(), 1); + lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1); + assertTrue(lastSequenceIdPushed != null); + assertEquals(lastSequenceIdPushed.longValue(), 1); + + byteBuf1 = getMessage(producerName1, 5); + publishContext1 = getPublishContext(producerName1, 5); + persistentTopic.publishMessage(byteBuf1, publishContext1); + persistentTopic.addComplete(new PositionImpl(0, 4), publishContext1); + verify(managedLedger, times(4)).asyncAddEntry(any(ByteBuf.class), any(), any()); + lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1); + assertTrue(lastSequenceIdPushed != null); + assertEquals(lastSequenceIdPushed.longValue(), 5); + lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1); + assertTrue(lastSequenceIdPushed != null); + assertEquals(lastSequenceIdPushed.longValue(), 5); + + // publish dup + byteBuf1 = getMessage(producerName1, 0); + publishContext1 = getPublishContext(producerName1, 0); + persistentTopic.publishMessage(byteBuf1, publishContext1); + verify(managedLedger, times(4)).asyncAddEntry(any(ByteBuf.class), any(), any()); + lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1); + assertTrue(lastSequenceIdPushed != null); + assertEquals(lastSequenceIdPushed.longValue(), 5); + verify(publishContext1, times(1)).completed(eq(null), eq(-1L), eq(-1L)); + + // publish message unknown dup status + byteBuf1 = getMessage(producerName1, 6); + publishContext1 = getPublishContext(producerName1, 6); + // don't complete message + persistentTopic.publishMessage(byteBuf1, publishContext1); + verify(managedLedger, times(5)).asyncAddEntry(any(ByteBuf.class), any(), any()); + lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1); + assertTrue(lastSequenceIdPushed != null); + assertEquals(lastSequenceIdPushed.longValue(), 6); + lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1); + assertTrue(lastSequenceIdPushed != null); + assertEquals(lastSequenceIdPushed.longValue(), 5); + + // publish same message again + byteBuf1 = getMessage(producerName1, 6); + publishContext1 = getPublishContext(producerName1, 6); + persistentTopic.publishMessage(byteBuf1, publishContext1); + verify(managedLedger, times(5)).asyncAddEntry(any(ByteBuf.class), any(), any()); + verify(publishContext1, times(1)).completed(any(MessageDeduplication.MessageDupUnknownException.class), eq(-1L), eq(-1L)); + + // complete seq 6 message eventually + persistentTopic.addComplete(new PositionImpl(0, 5), publishContext1); + + // simulate failure + byteBuf1 = getMessage(producerName1, 7); + publishContext1 = getPublishContext(producerName1, 7); + persistentTopic.publishMessage(byteBuf1, publishContext1); + verify(managedLedger, times(6)).asyncAddEntry(any(ByteBuf.class), any(), any()); + + persistentTopic.addFailed(new ManagedLedgerException("test"), publishContext1); + // check highestSequencedPushed is reset + assertEquals(messageDeduplication.highestSequencedPushed.size(), 2); + assertEquals(messageDeduplication.highestSequencedPersisted.size(), 2); + lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1); + assertEquals(lastSequenceIdPushed.longValue(), 6); + lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1); + assertEquals(lastSequenceIdPushed.longValue(), 6); + lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName2); + assertEquals(lastSequenceIdPushed.longValue(), 1); + lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName2); + assertEquals(lastSequenceIdPushed.longValue(), 1); + verify(messageDeduplication, times(1)).resetHighestSequenceIdPushed(); + + // try dup + byteBuf1 = getMessage(producerName1, 6); + publishContext1 = getPublishContext(producerName1, 6); + persistentTopic.publishMessage(byteBuf1, publishContext1); + verify(managedLedger, times(6)).asyncAddEntry(any(ByteBuf.class), any(), any()); + verify(publishContext1, times(1)).completed(eq(null), eq(-1L), eq(-1L)); + lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1); + assertTrue(lastSequenceIdPushed != null); + assertEquals(lastSequenceIdPushed.longValue(), 6); + + // try new message + byteBuf1 = getMessage(producerName1, 8); + publishContext1 = getPublishContext(producerName1, 8); + persistentTopic.publishMessage(byteBuf1, publishContext1); + verify(managedLedger, times(7)).asyncAddEntry(any(ByteBuf.class), any(), any()); + persistentTopic.addComplete(new PositionImpl(0, 5), publishContext1); + lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1); + assertTrue(lastSequenceIdPushed != null); + assertEquals(lastSequenceIdPushed.longValue(), 8); + lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1); + assertTrue(lastSequenceIdPushed != null); + assertEquals(lastSequenceIdPushed.longValue(), 8); + + } + public ByteBuf getMessage(String producerName, long seqId) { PulsarApi.MessageMetadata messageMetadata = PulsarApi.MessageMetadata.newBuilder() .setProducerName(producerName).setSequenceId(seqId) @@ -127,7 +303,7 @@ public class MessageDuplicationTest { } public Topic.PublishContext getPublishContext(String producerName, long seqId) { - return new Topic.PublishContext() { + return spy(new Topic.PublishContext() { @Override public String getProducerName() { return producerName; @@ -141,6 +317,6 @@ public class MessageDuplicationTest { public void completed(Exception e, long ledgerId, long entryId) { } - }; + }); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java new file mode 100644 index 0000000..55020c6 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.bookkeeper.test.PortManager; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; +import org.apache.pulsar.client.admin.BrokerStats; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.io.PulsarFunctionE2ETest; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.lang.reflect.Method; +import java.net.URL; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; + +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +public class ClientDeduplicationFailureTest { + LocalBookkeeperEnsemble bkEnsemble; + + ServiceConfiguration config; + URL url; + PulsarService pulsar; + PulsarAdmin admin; + PulsarClient pulsarClient; + BrokerStats brokerStatsClient; + final String tenant = "external-repl-prop"; + String primaryHost; + + private final int ZOOKEEPER_PORT = PortManager.nextFreePort(); + private final int brokerWebServicePort = PortManager.nextFreePort(); + private final int brokerServicePort = PortManager.nextFreePort(); + + private static final Logger log = LoggerFactory.getLogger(PulsarFunctionE2ETest.class); + + @BeforeMethod(timeOut = 300000) + void setup(Method method) throws Exception { + log.info("--- Setting up method {} ---", method.getName()); + + // Start local bookkeeper ensemble + bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager::nextFreePort); + bkEnsemble.start(); + + String brokerServiceUrl = "http://127.0.0.1:" + brokerWebServicePort; + + config = spy(new ServiceConfiguration()); + config.setClusterName("use"); + config.setWebServicePort(Optional.ofNullable(brokerWebServicePort)); + config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); + config.setBrokerServicePort(Optional.ofNullable(brokerServicePort)); + config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); + config.setTlsAllowInsecureConnection(true); + config.setAdvertisedAddress("localhost"); + config.setLoadBalancerSheddingEnabled(false); + + config.setAllowAutoTopicCreationType("non-partitioned"); + + url = new URL(brokerServiceUrl); + pulsar = new PulsarService(config, Optional.empty()); + pulsar.start(); + + admin = PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).build(); + + brokerStatsClient = admin.brokerStats(); + primaryHost = String.format("http://%s:%d", "localhost", brokerWebServicePort); + + // update cluster metadata + ClusterData clusterData = new ClusterData(url.toString()); + admin.clusters().createCluster(config.getClusterName(), clusterData); + + ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort().get()).maxBackoffInterval(1, TimeUnit.SECONDS); + pulsarClient = clientBuilder.build(); + + TenantInfo tenantInfo = new TenantInfo(); + tenantInfo.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use"))); + admin.tenants().createTenant(tenant, tenantInfo); + } + + @AfterMethod + void shutdown() throws Exception { + log.info("--- Shutting down ---"); + pulsarClient.close(); + admin.close(); + pulsar.close(); + bkEnsemble.stop(); + } + + @Test + public void testClientDeduplicationWithBkFailure() throws Exception { + final String namespacePortion = "dedup"; + final String replNamespace = tenant + "/" + namespacePortion; + final String sourceTopic = "persistent://" + replNamespace + "/my-topic1"; + final String subscriptionName1 = "sub1"; + final String subscriptionName2 = "sub2"; + final String consumerName1 = "test-consumer-1"; + final String consumerName2 = "test-consumer-2"; + final List<Message<String>> msgRecvd = new LinkedList<>(); + admin.namespaces().createNamespace(replNamespace); + Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use")); + admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); + admin.namespaces().setDeduplicationStatus(replNamespace, true); + Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic) + .producerName("test-producer-1").create(); + Consumer<String> consumer1 = pulsarClient.newConsumer(Schema.STRING).topic(sourceTopic) + .consumerName(consumerName1).subscriptionName(subscriptionName1).subscribe(); + Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING).topic(sourceTopic) + .consumerName(consumerName2).subscriptionName(subscriptionName2).subscribe(); + + new Thread(() -> { + while(true) { + try { + Message<String> msg = consumer2.receive(); + msgRecvd.add(msg); + consumer2.acknowledge(msg); + } catch (PulsarClientException e) { + log.error("Failed to consume message: {}", e, e); + } + } + }).start(); + + retryStrategically((test) -> { + try { + TopicStats topicStats = admin.topics().getStats(sourceTopic); + boolean c1 = topicStats!= null + && topicStats.subscriptions.get(subscriptionName1) != null + && topicStats.subscriptions.get(subscriptionName1).consumers.size() == 1 + && topicStats.subscriptions.get(subscriptionName1).consumers.get(0).consumerName.equals(consumerName1); + + boolean c2 = topicStats!= null + && topicStats.subscriptions.get(subscriptionName2) != null + && topicStats.subscriptions.get(subscriptionName2).consumers.size() == 1 + && topicStats.subscriptions.get(subscriptionName2).consumers.get(0).consumerName.equals(consumerName2); + return c1 && c2; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 200); + + TopicStats topicStats1 = admin.topics().getStats(sourceTopic); + assertTrue(topicStats1!= null); + assertTrue(topicStats1.subscriptions.get(subscriptionName1) != null); + assertEquals(topicStats1.subscriptions.get(subscriptionName1).consumers.size(), 1); + assertEquals(topicStats1.subscriptions.get(subscriptionName1).consumers.get(0).consumerName, consumerName1); + TopicStats topicStats2 = admin.topics().getStats(sourceTopic); + assertTrue(topicStats2!= null); + assertTrue(topicStats2.subscriptions.get(subscriptionName2) != null); + assertEquals(topicStats2.subscriptions.get(subscriptionName2).consumers.size(), 1); + assertEquals(topicStats2.subscriptions.get(subscriptionName2).consumers.get(0).consumerName, consumerName2); + + for (int i=0; i<10; i++) { + producer.newMessage().sequenceId(i).value("foo-" + i).send(); + } + + for (int i=0; i<10; i++) { + Message<String> msg = consumer1.receive(); + consumer1.acknowledge(msg); + assertEquals(msg.getValue(), "foo-" + i); + assertEquals(msg.getSequenceId(), i); + } + + log.info("Stopping BK..."); + bkEnsemble.stopBK(); + + List<CompletableFuture<MessageId>> futures = new LinkedList<>(); + for (int i=10; i<20; i++) { + CompletableFuture<MessageId> future = producer.newMessage().sequenceId(i).value("foo-" + i).sendAsync(); + int finalI = i; + future.thenRun(() -> log.error("message: {} successful", finalI)).exceptionally((Function<Throwable, Void>) throwable -> { + log.info("message: {} failed: {}", finalI, throwable, throwable); + return null; + }); + futures.add(future); + } + + for (int i = 0; i < futures.size(); i++) { + try { + // message should not be produced successfully + futures.get(i).join(); + fail(); + } catch (CompletionException ex) { + + } catch (Exception e) { + fail(); + } + } + + try { + producer.newMessage().sequenceId(10).value("foo-10").send(); + fail(); + } catch (PulsarClientException ex) { + + } + + try { + producer.newMessage().sequenceId(10).value("foo-10").send(); + fail(); + } catch (PulsarClientException ex) { + + } + + log.info("Starting BK..."); + bkEnsemble.startBK(); + + for (int i=20; i<30; i++) { + producer.newMessage().sequenceId(i).value("foo-" + i).send(); + } + + MessageId lastMessageId = null; + for (int i=20; i<30; i++) { + Message<String> msg = consumer1.receive(); + lastMessageId = msg.getMessageId(); + consumer1.acknowledge(msg); + assertEquals(msg.getValue(), "foo-" + i); + assertEquals(msg.getSequenceId(), i); + } + + // check all messages + retryStrategically((test) -> msgRecvd.size() >= 20, 5, 200); + + assertEquals(msgRecvd.size(), 20); + for (int i=0; i<10; i++) { + assertEquals(msgRecvd.get(i).getValue(), "foo-" + i); + assertEquals(msgRecvd.get(i).getSequenceId(), i); + } + for (int i=10; i<20; i++) { + assertEquals(msgRecvd.get(i).getValue(), "foo-" + (i + 10)); + assertEquals(msgRecvd.get(i).getSequenceId(), i + 10); + } + + BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) lastMessageId; + MessageIdImpl messageId = (MessageIdImpl) consumer1.getLastMessageId(); + + assertEquals(messageId.getLedgerId(), batchMessageId.getLedgerId()); + assertEquals(messageId.getEntryId(), batchMessageId.getEntryId()); + } +} diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java index 373f8bc..4c4d92d 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java @@ -43,6 +43,7 @@ import java.util.function.Supplier; import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException; import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage; +import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.clients.StorageClientBuilder; import org.apache.bookkeeper.clients.admin.StorageAdminClient; import org.apache.bookkeeper.clients.config.StorageClientSettings; @@ -53,6 +54,7 @@ import org.apache.bookkeeper.common.util.Backoff; import org.apache.bookkeeper.common.util.Backoff.Jitter.Type; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.proto.BookieServer; +import org.apache.bookkeeper.replication.ReplicationException; import org.apache.bookkeeper.server.conf.BookieConfiguration; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stream.proto.NamespaceConfiguration; @@ -162,10 +164,6 @@ public class LocalBookkeeperEnsemble { StreamStorageLifecycleComponent streamStorage; Integer streamStoragePort = 4181; - /** - * @param args - */ - private void runZookeeper(int maxCC) throws IOException { // create a ZooKeeper server(dataDir, dataLogDir, port) LOG.info("Starting ZK server"); @@ -399,6 +397,37 @@ public class LocalBookkeeperEnsemble { } } + public void stopBK() { + LOG.debug("Local ZK/BK stopping ..."); + for (BookieServer bookie : bs) { + bookie.shutdown(); + } + } + + public void startBK() throws Exception { + for (int i = 0; i < numberOfBookies; i++) { + + try { + bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE); + } catch (InvalidCookieException e) { + // InvalidCookieException can happen if the machine IP has changed + // Since we are running here a local bookie that is always accessed + // from localhost, we can ignore the error + for (String path : zkc.getChildren("/ledgers/cookies", false)) { + zkc.delete("/ledgers/cookies/" + path, -1); + } + + // Also clean the on-disk cookie + new File(new File(bsConfs[i].getJournalDirNames()[0], "current"), "VERSION").delete(); + + // Retry to start the bookie after cleaning the old left cookie + bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE); + + } + bs[i].start(); + } + } + public void stop() throws Exception { if (null != streamStorage) { LOG.debug("Local bk stream storage stopping ...");