This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 26c4626 Refactored handling of zero-queue consumer into separate subclass (#3615) 26c4626 is described below commit 26c462625483ef0e0fa3c74bca6e84a3f569115b Author: Matteo Merli <mme...@apache.org> AuthorDate: Wed Feb 20 04:05:25 2019 -0800 Refactored handling of zero-queue consumer into separate subclass (#3615) ### Motivation Simplified `ConsumerImpl` code by factoring out a bunch of if/else statements into a specialized subclass implementation. --- .../apache/pulsar/client/impl/ConsumerImpl.java | 220 ++++++--------------- .../client/impl/MultiTopicsConsumerImpl.java | 16 +- .../pulsar/client/impl/PulsarClientImpl.java | 5 +- .../pulsar/client/impl/ZeroQueueConsumerImpl.java | 187 ++++++++++++++++++ .../pulsar/client/impl/ConsumerImplTest.java | 5 +- 5 files changed, 266 insertions(+), 167 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 5d5b1fa..7fb4531 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -19,14 +19,13 @@ package org.apache.pulsar.client.impl; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum; -import static java.lang.String.format; import static org.apache.pulsar.common.api.Commands.hasChecksum; import static org.apache.pulsar.common.api.Commands.readChecksum; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; + import io.netty.buffer.ByteBuf; import io.netty.util.Timeout; @@ -100,7 +99,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle @SuppressWarnings("unused") private volatile int availablePermits = 0; - private volatile MessageId lastDequeuedMessage = MessageId.earliest; + protected volatile MessageId lastDequeuedMessage = MessageId.earliest; private volatile MessageId lastMessageIdInBroker = MessageId.earliest; private long subscribeTimeout; @@ -108,12 +107,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle private final int receiverQueueRefillThreshold; - private volatile boolean waitingOnReceiveForZeroQueueSize = false; - private final ReadWriteLock lock = new ReentrantReadWriteLock(); - private final ReadWriteLock zeroQueueLock; - private final UnAckedMessageTracker unAckedMessageTracker; private final AcknowledgmentsGroupingTracker acknowledgmentsGroupingTracker; @@ -153,14 +148,19 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle NonDurable } - ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, - ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture, - Schema<T> schema, ConsumerInterceptors<T> interceptors) { - this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, SubscriptionMode.Durable, null, - schema, interceptors); + static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, + ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture, + SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) { + if (conf.getReceiverQueueSize() == 0) { + return new ZeroQueueConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, + subscriptionMode, startMessageId, schema, interceptors); + } else { + return new ConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, + subscriptionMode, startMessageId, schema, interceptors); + } } - ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, + protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture, SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) { super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema, interceptors); @@ -181,12 +181,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle stats = ConsumerStatsDisabled.INSTANCE; } - if (conf.getReceiverQueueSize() <= 1) { - zeroQueueLock = new ReentrantReadWriteLock(); - } else { - zeroQueueLock = null; - } - if (conf.getAckTimeoutMillis() != 0) { if (conf.getTickDurationMillis() > 0) { this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis(), conf.getTickDurationMillis()); @@ -290,15 +284,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle @Override protected Message<T> internalReceive() throws PulsarClientException { - if (conf.getReceiverQueueSize() == 0) { - checkArgument(zeroQueueLock != null, "Receiver queue size can't be modified"); - zeroQueueLock.writeLock().lock(); - try { - return fetchSingleMessageFromBroker(); - } finally { - zeroQueueLock.writeLock().unlock(); - } - } Message<T> message; try { message = incomingMessages.take(); @@ -331,9 +316,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle lock.writeLock().unlock(); } - if (message == null && conf.getReceiverQueueSize() == 0) { - sendFlowPermitsToBroker(cnx(), 1); - } else if (message != null) { + if (message != null) { trackMessage(message); Message<T> interceptMsg = beforeConsume(message); messageProcessed(interceptMsg); @@ -343,53 +326,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle return result; } - private Message<T> fetchSingleMessageFromBroker() throws PulsarClientException { - checkArgument(conf.getReceiverQueueSize() == 0); - - // Just being cautious - if (incomingMessages.size() > 0) { - log.error("The incoming message queue should never be greater than 0 when Queue size is 0"); - incomingMessages.clear(); - } - - Message<T> message; - try { - // if cnx is null or if the connection breaks the connectionOpened function will send the flow again - waitingOnReceiveForZeroQueueSize = true; - synchronized (this) { - if (isConnected()) { - sendFlowPermitsToBroker(cnx(), 1); - } - } - do { - message = incomingMessages.take(); - lastDequeuedMessage = message.getMessageId(); - ClientCnx msgCnx = ((MessageImpl<?>) message).getCnx(); - // synchronized need to prevent race between connectionOpened and the check "msgCnx == cnx()" - synchronized (ConsumerImpl.this) { - // if message received due to an old flow - discard it and wait for the message from the - // latest flow command - if (msgCnx == cnx()) { - waitingOnReceiveForZeroQueueSize = false; - break; - } - } - } while (true); - - stats.updateNumMsgsReceived(message); - return message; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - stats.incrementNumReceiveFailed(); - throw new PulsarClientException(e); - } finally { - // Finally blocked is invoked in case the block on incomingMessages is interrupted - waitingOnReceiveForZeroQueueSize = false; - // Clearing the queue in case there was a race with messageReceived - incomingMessages.clear(); - } - } - @Override protected Message<T> internalReceive(int timeout, TimeUnit unit) throws PulsarClientException { Message<T> message; @@ -567,16 +503,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle cnx.sendRequestWithId(request, requestId).thenRun(() -> { synchronized (ConsumerImpl.this) { if (changeToReadyState()) { - log.info("[{}][{}] Subscribed to topic on {} -- consumer: {}", topic, subscription, - cnx.channel().remoteAddress(), consumerId); - - AVAILABLE_PERMITS_UPDATER.set(this, 0); - // For zerosize queue : If the connection is reset and someone is waiting for the messages - // or queue was not empty: send a flow command - if (waitingOnReceiveForZeroQueueSize - || (conf.getReceiverQueueSize() == 0 && (currentSize > 0 || listener != null))) { - sendFlowPermitsToBroker(cnx, 1); - } + consumerIsReconnectedToBroker(cnx, currentSize); } else { // Consumer was closed while reconnecting, close the connection to make sure the broker // drops the consumer on its side @@ -623,6 +550,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle }); } + protected void consumerIsReconnectedToBroker(ClientCnx cnx, int currentQueueSize) { + log.info("[{}][{}] Subscribed to topic on {} -- consumer: {}", topic, subscription, + cnx.channel().remoteAddress(), consumerId); + + AVAILABLE_PERMITS_UPDATER.set(this, 0); + } + /** * Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that was * not seen by the application @@ -846,68 +780,61 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle if (!pendingReceives.isEmpty()) { trackMessage(message); notifyPendingReceivedCallback(message, null); - } else if (conf.getReceiverQueueSize() != 0 || waitingOnReceiveForZeroQueueSize) { + } else if (canEnqueueMessage(message)) { incomingMessages.add(message); - } else if (conf.getReceiverQueueSize() == 0 && listener != null) { - triggerZeroQueueSizeListener(message); } } finally { lock.readLock().unlock(); } } else { - if (conf.getReceiverQueueSize() == 0) { - log.warn( - "Closing consumer [{}]-[{}] due to unsupported received batch-message with zero receiver queue size", - subscription, consumerName); - // close connection - closeAsync().handle((ok, e) -> { - // notify callback with failure result - notifyPendingReceivedCallback(null, - new PulsarClientException.InvalidMessageException( - format("Unsupported Batch message with 0 size receiver queue for [%s]-[%s] ", - subscription, consumerName))); - return null; - }); - } else { - // handle batch message enqueuing; uncompressed payload has all messages in batch - receiveIndividualMessagesFromBatch(msgMetadata, redeliveryCount, uncompressedPayload, messageId, cnx); - } + // handle batch message enqueuing; uncompressed payload has all messages in batch + receiveIndividualMessagesFromBatch(msgMetadata, redeliveryCount, uncompressedPayload, messageId, cnx); + uncompressedPayload.release(); msgMetadata.recycle(); } - if (listener != null && conf.getReceiverQueueSize() != 0) { - // Trigger the notification on the message listener in a separate thread to avoid blocking the networking - // thread while the message processing happens - listenerExecutor.execute(() -> { - for (int i = 0; i < numMessages; i++) { - try { - Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS); - // complete the callback-loop in case queue is cleared up - if (msg == null) { - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription); - } - break; + if (listener != null) { + triggerListener(numMessages); + } + } + + protected void triggerListener(int numMessages) { + // Trigger the notification on the message listener in a separate thread to avoid blocking the networking + // thread while the message processing happens + listenerExecutor.execute(() -> { + for (int i = 0; i < numMessages; i++) { + try { + Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS); + // complete the callback-loop in case queue is cleared up + if (msg == null) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription); } - try { - if (log.isDebugEnabled()) { - log.debug("[{}][{}] Calling message listener for message {}", topic, subscription, - msg.getMessageId()); - } - listener.received(ConsumerImpl.this, msg); - } catch (Throwable t) { - log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription, - msg.getMessageId(), t); + break; + } + try { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Calling message listener for message {}", topic, subscription, + msg.getMessageId()); } - - } catch (PulsarClientException e) { - log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e); - return; + listener.received(ConsumerImpl.this, msg); + } catch (Throwable t) { + log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription, + msg.getMessageId(), t); } + + } catch (PulsarClientException e) { + log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e); + return; } - }); - } + } + }); + } + + protected boolean canEnqueueMessage(Message<T> message) { + // Default behavior, can be overridden in subclasses + return true; } /** @@ -956,27 +883,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle listenerExecutor.execute(() -> receivedFuture.complete(interceptMessage)); } - private void triggerZeroQueueSizeListener(final Message<T> message) { - checkArgument(conf.getReceiverQueueSize() == 0); - checkNotNull(listener, "listener can't be null"); - checkNotNull(message, "unqueued message can't be null"); - - listenerExecutor.execute(() -> { - stats.updateNumMsgsReceived(message); - try { - if (log.isDebugEnabled()) { - log.debug("[{}][{}] Calling message listener for unqueued message {}", topic, subscription, - message.getMessageId()); - } - listener.received(ConsumerImpl.this, message); - } catch (Throwable t) { - log.error("[{}][{}] Message listener error in processing unqueued message: {}", topic, subscription, - message.getMessageId(), t); - } - increaseAvailablePermits(cnx()); - }); - } - void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliveryCount, ByteBuf uncompressedPayload, MessageIdData messageId, ClientCnx cnx) { int batchSize = msgMetadata.getNumMessagesInBatch(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 39c0511..ddd44d2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -52,6 +52,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.NotSupportedException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.util.ConsumerName; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; @@ -716,8 +717,10 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { partitionIndex -> { String partitionName = TopicName.get(topicName).getPartition(partitionIndex).toString(); CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>(); - ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client, partitionName, configurationData, - client.externalExecutorProvider().getExecutor(), partitionIndex, subFuture, schema, interceptors); + ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, partitionName, + configurationData, client.externalExecutorProvider().getExecutor(), + partitionIndex, subFuture, + SubscriptionMode.Durable, null, schema, interceptors); consumers.putIfAbsent(newConsumer.getTopic(), newConsumer); return subFuture; }) @@ -727,8 +730,9 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { allTopicPartitionsNumber.incrementAndGet(); CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>(); - ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client, topicName, internalConfig, - client.externalExecutorProvider().getExecutor(), 0, subFuture, schema, interceptors); + ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig, + client.externalExecutorProvider().getExecutor(), 0, subFuture, SubscriptionMode.Durable, null, + schema, interceptors); consumers.putIfAbsent(newConsumer.getTopic(), newConsumer); futureList = Collections.singletonList(subFuture); @@ -941,10 +945,10 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { int partitionIndex = TopicName.getPartitionIndex(partitionName); CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>(); ConsumerConfigurationData<T> configurationData = getInternalConsumerConfig(); - ConsumerImpl<T> newConsumer = new ConsumerImpl<>( + ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl( client, partitionName, configurationData, client.externalExecutorProvider().getExecutor(), - partitionIndex, subFuture, schema, interceptors); + partitionIndex, subFuture, SubscriptionMode.Durable, null, schema, interceptors); consumers.putIfAbsent(newConsumer.getTopic(), newConsumer); if (log.isDebugEnabled()) { log.debug("[{}] create consumer {} for partitionName: {}", diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 63b4071..a224eb4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -52,6 +52,7 @@ import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.api.RegexSubscriptionMode; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; @@ -332,8 +333,8 @@ public class PulsarClientImpl implements PulsarClient { consumer = MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf, listenerThread, consumerSubscribedFuture, metadata.partitions, schema, interceptors); } else { - consumer = new ConsumerImpl<>(PulsarClientImpl.this, topic, conf, listenerThread, -1, - consumerSubscribedFuture, schema, interceptors); + consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, -1, + consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors); } synchronized (consumers) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java new file mode 100644 index 0000000..09b610f --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.lang.String.format; + +import io.netty.buffer.ByteBuf; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; + +@Slf4j +public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> { + + private final Lock zeroQueueLock = new ReentrantLock(); + + private volatile boolean waitingOnReceiveForZeroQueueSize = false; + + public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, + ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture, + SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, + ConsumerInterceptors<T> interceptors) { + super(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, subscriptionMode, startMessageId, + schema, interceptors); + } + + @Override + protected Message<T> internalReceive() throws PulsarClientException { + zeroQueueLock.lock(); + try { + return fetchSingleMessageFromBroker(); + } finally { + zeroQueueLock.unlock(); + } + } + + @Override + protected CompletableFuture<Message<T>> internalReceiveAsync() { + CompletableFuture<Message<T>> future = super.internalReceiveAsync(); + if (!future.isDone()) { + // We expect the message to be not in the queue yet + sendFlowPermitsToBroker(cnx(), 1); + } + + return future; + } + + private Message<T> fetchSingleMessageFromBroker() throws PulsarClientException { + // Just being cautious + if (incomingMessages.size() > 0) { + log.error("The incoming message queue should never be greater than 0 when Queue size is 0"); + incomingMessages.clear(); + } + + Message<T> message; + try { + // if cnx is null or if the connection breaks the connectionOpened function will send the flow again + waitingOnReceiveForZeroQueueSize = true; + synchronized (this) { + if (isConnected()) { + sendFlowPermitsToBroker(cnx(), 1); + } + } + do { + message = incomingMessages.take(); + lastDequeuedMessage = message.getMessageId(); + ClientCnx msgCnx = ((MessageImpl<?>) message).getCnx(); + // synchronized need to prevent race between connectionOpened and the check "msgCnx == cnx()" + synchronized (this) { + // if message received due to an old flow - discard it and wait for the message from the + // latest flow command + if (msgCnx == cnx()) { + waitingOnReceiveForZeroQueueSize = false; + break; + } + } + } while (true); + + stats.updateNumMsgsReceived(message); + return message; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + stats.incrementNumReceiveFailed(); + throw new PulsarClientException(e); + } finally { + // Finally blocked is invoked in case the block on incomingMessages is interrupted + waitingOnReceiveForZeroQueueSize = false; + // Clearing the queue in case there was a race with messageReceived + incomingMessages.clear(); + } + } + + @Override + protected void consumerIsReconnectedToBroker(ClientCnx cnx, int currentQueueSize) { + super.consumerIsReconnectedToBroker(cnx, currentQueueSize); + + // For zerosize queue : If the connection is reset and someone is waiting for the messages + // or queue was not empty: send a flow command + if (waitingOnReceiveForZeroQueueSize + || currentQueueSize > 0 + || listener != null) { + sendFlowPermitsToBroker(cnx, 1); + } + } + + @Override + protected boolean canEnqueueMessage(Message<T> message) { + if (listener != null) { + triggerZeroQueueSizeListener(message); + return false; + } else { + return true; + } + } + + private void triggerZeroQueueSizeListener(final Message<T> message) { + checkNotNull(listener, "listener can't be null"); + checkNotNull(message, "unqueued message can't be null"); + + listenerExecutor.execute(() -> { + stats.updateNumMsgsReceived(message); + try { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Calling message listener for unqueued message {}", topic, subscription, + message.getMessageId()); + } + listener.received(ZeroQueueConsumerImpl.this, message); + } catch (Throwable t) { + log.error("[{}][{}] Message listener error in processing unqueued message: {}", topic, subscription, + message.getMessageId(), t); + } + increaseAvailablePermits(cnx()); + }); + } + + @Override + protected void triggerListener(int numMessages) { + // Ignore since it was already triggered in the triggerZeroQueueSizeListener() call + } + + @Override + void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliveryCount, + ByteBuf uncompressedPayload, MessageIdData messageId, ClientCnx cnx) { + log.warn( + "Closing consumer [{}]-[{}] due to unsupported received batch-message with zero receiver queue size", + subscription, consumerName); + // close connection + closeAsync().handle((ok, e) -> { + // notify callback with failure result + notifyPendingReceivedCallback(null, + new PulsarClientException.InvalidMessageException( + format("Unsupported Batch message with 0 size receiver queue for [%s]-[%s] ", + subscription, consumerName))); + return null; + }); + } +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java index 657b4f4..e09184c 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.testng.Assert; @@ -56,8 +57,8 @@ public class ConsumerImplTest { when(client.getConfiguration()).thenReturn(clientConf); consumerConf.setSubscriptionName("test-sub"); - consumer = new ConsumerImpl<ConsumerImpl>(client, topic, consumerConf, - executorService, -1, subscribeFuture, null, null); + consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf, + executorService, -1, subscribeFuture, SubscriptionMode.Durable, null, null, null); } @Test(invocationTimeOut = 1000)