This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 26c4626  Refactored handling of zero-queue consumer into separate 
subclass (#3615)
26c4626 is described below

commit 26c462625483ef0e0fa3c74bca6e84a3f569115b
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Wed Feb 20 04:05:25 2019 -0800

    Refactored handling of zero-queue consumer into separate subclass (#3615)
    
    ### Motivation
    
    Simplified `ConsumerImpl` code by factoring out a bunch of if/else 
statements into a specialized subclass implementation.
---
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 220 ++++++---------------
 .../client/impl/MultiTopicsConsumerImpl.java       |  16 +-
 .../pulsar/client/impl/PulsarClientImpl.java       |   5 +-
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  | 187 ++++++++++++++++++
 .../pulsar/client/impl/ConsumerImplTest.java       |   5 +-
 5 files changed, 266 insertions(+), 167 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 5d5b1fa..7fb4531 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -19,14 +19,13 @@
 package org.apache.pulsar.client.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
-import static java.lang.String.format;
 import static org.apache.pulsar.common.api.Commands.hasChecksum;
 import static org.apache.pulsar.common.api.Commands.readChecksum;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.util.Timeout;
 
@@ -100,7 +99,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     @SuppressWarnings("unused")
     private volatile int availablePermits = 0;
 
-    private volatile MessageId lastDequeuedMessage = MessageId.earliest;
+    protected volatile MessageId lastDequeuedMessage = MessageId.earliest;
     private volatile MessageId lastMessageIdInBroker = MessageId.earliest;
 
     private long subscribeTimeout;
@@ -108,12 +107,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     private final int receiverQueueRefillThreshold;
 
-    private volatile boolean waitingOnReceiveForZeroQueueSize = false;
-
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-    private final ReadWriteLock zeroQueueLock;
-
     private final UnAckedMessageTracker unAckedMessageTracker;
     private final AcknowledgmentsGroupingTracker 
acknowledgmentsGroupingTracker;
 
@@ -153,14 +148,19 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         NonDurable
     }
 
-    ConsumerImpl(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf,
-            ExecutorService listenerExecutor, int partitionIndex, 
CompletableFuture<Consumer<T>> subscribeFuture,
-            Schema<T> schema, ConsumerInterceptors<T> interceptors) {
-        this(client, topic, conf, listenerExecutor, partitionIndex, 
subscribeFuture, SubscriptionMode.Durable, null,
-                schema, interceptors);
+    static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String 
topic, ConsumerConfigurationData<T> conf,
+                 ExecutorService listenerExecutor, int partitionIndex, 
CompletableFuture<Consumer<T>> subscribeFuture,
+                 SubscriptionMode subscriptionMode, MessageId startMessageId, 
Schema<T> schema, ConsumerInterceptors<T> interceptors) {
+        if (conf.getReceiverQueueSize() == 0) {
+            return new ZeroQueueConsumerImpl<>(client, topic, conf, 
listenerExecutor, partitionIndex, subscribeFuture,
+                    subscriptionMode, startMessageId, schema, interceptors);
+        } else {
+            return new ConsumerImpl<>(client, topic, conf, listenerExecutor, 
partitionIndex, subscribeFuture,
+                    subscriptionMode, startMessageId, schema, interceptors);
+        }
     }
 
-    ConsumerImpl(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf,
+    protected ConsumerImpl(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf,
                  ExecutorService listenerExecutor, int partitionIndex, 
CompletableFuture<Consumer<T>> subscribeFuture,
                  SubscriptionMode subscriptionMode, MessageId startMessageId, 
Schema<T> schema, ConsumerInterceptors<T> interceptors) {
         super(client, topic, conf, conf.getReceiverQueueSize(), 
listenerExecutor, subscribeFuture, schema, interceptors);
@@ -181,12 +181,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             stats = ConsumerStatsDisabled.INSTANCE;
         }
 
-        if (conf.getReceiverQueueSize() <= 1) {
-            zeroQueueLock = new ReentrantReadWriteLock();
-        } else {
-            zeroQueueLock = null;
-        }
-
         if (conf.getAckTimeoutMillis() != 0) {
             if (conf.getTickDurationMillis() > 0) {
                 this.unAckedMessageTracker = new UnAckedMessageTracker(client, 
this, conf.getAckTimeoutMillis(), conf.getTickDurationMillis());
@@ -290,15 +284,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     @Override
     protected Message<T> internalReceive() throws PulsarClientException {
-        if (conf.getReceiverQueueSize() == 0) {
-            checkArgument(zeroQueueLock != null, "Receiver queue size can't be 
modified");
-            zeroQueueLock.writeLock().lock();
-            try {
-                return fetchSingleMessageFromBroker();
-            } finally {
-                zeroQueueLock.writeLock().unlock();
-            }
-        }
         Message<T> message;
         try {
             message = incomingMessages.take();
@@ -331,9 +316,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             lock.writeLock().unlock();
         }
 
-        if (message == null && conf.getReceiverQueueSize() == 0) {
-            sendFlowPermitsToBroker(cnx(), 1);
-        } else if (message != null) {
+        if (message != null) {
             trackMessage(message);
             Message<T> interceptMsg = beforeConsume(message);
             messageProcessed(interceptMsg);
@@ -343,53 +326,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         return result;
     }
 
-    private Message<T> fetchSingleMessageFromBroker() throws 
PulsarClientException {
-        checkArgument(conf.getReceiverQueueSize() == 0);
-
-        // Just being cautious
-        if (incomingMessages.size() > 0) {
-            log.error("The incoming message queue should never be greater than 
0 when Queue size is 0");
-            incomingMessages.clear();
-        }
-
-        Message<T> message;
-        try {
-            // if cnx is null or if the connection breaks the connectionOpened 
function will send the flow again
-            waitingOnReceiveForZeroQueueSize = true;
-            synchronized (this) {
-                if (isConnected()) {
-                    sendFlowPermitsToBroker(cnx(), 1);
-                }
-            }
-            do {
-                message = incomingMessages.take();
-                lastDequeuedMessage = message.getMessageId();
-                ClientCnx msgCnx = ((MessageImpl<?>) message).getCnx();
-                // synchronized need to prevent race between connectionOpened 
and the check "msgCnx == cnx()"
-                synchronized (ConsumerImpl.this) {
-                    // if message received due to an old flow - discard it and 
wait for the message from the
-                    // latest flow command
-                    if (msgCnx == cnx()) {
-                        waitingOnReceiveForZeroQueueSize = false;
-                        break;
-                    }
-                }
-            } while (true);
-
-            stats.updateNumMsgsReceived(message);
-            return message;
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            stats.incrementNumReceiveFailed();
-            throw new PulsarClientException(e);
-        } finally {
-            // Finally blocked is invoked in case the block on 
incomingMessages is interrupted
-            waitingOnReceiveForZeroQueueSize = false;
-            // Clearing the queue in case there was a race with messageReceived
-            incomingMessages.clear();
-        }
-    }
-
     @Override
     protected Message<T> internalReceive(int timeout, TimeUnit unit) throws 
PulsarClientException {
         Message<T> message;
@@ -567,16 +503,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         cnx.sendRequestWithId(request, requestId).thenRun(() -> {
             synchronized (ConsumerImpl.this) {
                 if (changeToReadyState()) {
-                    log.info("[{}][{}] Subscribed to topic on {} -- consumer: 
{}", topic, subscription,
-                            cnx.channel().remoteAddress(), consumerId);
-
-                    AVAILABLE_PERMITS_UPDATER.set(this, 0);
-                    // For zerosize queue : If the connection is reset and 
someone is waiting for the messages
-                    // or queue was not empty: send a flow command
-                    if (waitingOnReceiveForZeroQueueSize
-                            || (conf.getReceiverQueueSize() == 0 && 
(currentSize > 0 || listener != null))) {
-                        sendFlowPermitsToBroker(cnx, 1);
-                    }
+                    consumerIsReconnectedToBroker(cnx, currentSize);
                 } else {
                     // Consumer was closed while reconnecting, close the 
connection to make sure the broker
                     // drops the consumer on its side
@@ -623,6 +550,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         });
     }
 
+    protected void consumerIsReconnectedToBroker(ClientCnx cnx, int 
currentQueueSize) {
+        log.info("[{}][{}] Subscribed to topic on {} -- consumer: {}", topic, 
subscription,
+                cnx.channel().remoteAddress(), consumerId);
+
+        AVAILABLE_PERMITS_UPDATER.set(this, 0);
+    }
+
     /**
      * Clear the internal receiver queue and returns the message id of what 
was the 1st message in the queue that was
      * not seen by the application
@@ -846,68 +780,61 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 if (!pendingReceives.isEmpty()) {
                     trackMessage(message);
                     notifyPendingReceivedCallback(message, null);
-                } else if (conf.getReceiverQueueSize() != 0 || 
waitingOnReceiveForZeroQueueSize) {
+                } else if (canEnqueueMessage(message)) {
                     incomingMessages.add(message);
-                } else if (conf.getReceiverQueueSize() == 0 && listener != 
null) {
-                    triggerZeroQueueSizeListener(message);
                 }
             } finally {
                 lock.readLock().unlock();
             }
         } else {
-            if (conf.getReceiverQueueSize() == 0) {
-                log.warn(
-                        "Closing consumer [{}]-[{}] due to unsupported 
received batch-message with zero receiver queue size",
-                        subscription, consumerName);
-                // close connection
-                closeAsync().handle((ok, e) -> {
-                    // notify callback with failure result
-                    notifyPendingReceivedCallback(null,
-                            new PulsarClientException.InvalidMessageException(
-                                    format("Unsupported Batch message with 0 
size receiver queue for [%s]-[%s] ",
-                                            subscription, consumerName)));
-                    return null;
-                });
-            } else {
-                // handle batch message enqueuing; uncompressed payload has 
all messages in batch
-                receiveIndividualMessagesFromBatch(msgMetadata, 
redeliveryCount, uncompressedPayload, messageId, cnx);
-            }
+            // handle batch message enqueuing; uncompressed payload has all 
messages in batch
+            receiveIndividualMessagesFromBatch(msgMetadata, redeliveryCount, 
uncompressedPayload, messageId, cnx);
+
             uncompressedPayload.release();
             msgMetadata.recycle();
         }
 
-        if (listener != null && conf.getReceiverQueueSize() != 0) {
-            // Trigger the notification on the message listener in a separate 
thread to avoid blocking the networking
-            // thread while the message processing happens
-            listenerExecutor.execute(() -> {
-                for (int i = 0; i < numMessages; i++) {
-                    try {
-                        Message<T> msg = internalReceive(0, 
TimeUnit.MILLISECONDS);
-                        // complete the callback-loop in case queue is cleared 
up
-                        if (msg == null) {
-                            if (log.isDebugEnabled()) {
-                                log.debug("[{}] [{}] Message has been cleared 
from the queue", topic, subscription);
-                            }
-                            break;
+        if (listener != null) {
+            triggerListener(numMessages);
+        }
+    }
+
+    protected void triggerListener(int numMessages) {
+        // Trigger the notification on the message listener in a separate 
thread to avoid blocking the networking
+        // thread while the message processing happens
+        listenerExecutor.execute(() -> {
+            for (int i = 0; i < numMessages; i++) {
+                try {
+                    Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS);
+                    // complete the callback-loop in case queue is cleared up
+                    if (msg == null) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] [{}] Message has been cleared from 
the queue", topic, subscription);
                         }
-                        try {
-                            if (log.isDebugEnabled()) {
-                                log.debug("[{}][{}] Calling message listener 
for message {}", topic, subscription,
-                                        msg.getMessageId());
-                            }
-                            listener.received(ConsumerImpl.this, msg);
-                        } catch (Throwable t) {
-                            log.error("[{}][{}] Message listener error in 
processing message: {}", topic, subscription,
-                                    msg.getMessageId(), t);
+                        break;
+                    }
+                    try {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}][{}] Calling message listener for 
message {}", topic, subscription,
+                                    msg.getMessageId());
                         }
-
-                    } catch (PulsarClientException e) {
-                        log.warn("[{}] [{}] Failed to dequeue the message for 
listener", topic, subscription, e);
-                        return;
+                        listener.received(ConsumerImpl.this, msg);
+                    } catch (Throwable t) {
+                        log.error("[{}][{}] Message listener error in 
processing message: {}", topic, subscription,
+                                msg.getMessageId(), t);
                     }
+
+                } catch (PulsarClientException e) {
+                    log.warn("[{}] [{}] Failed to dequeue the message for 
listener", topic, subscription, e);
+                    return;
                 }
-            });
-        }
+            }
+        });
+    }
+
+    protected boolean canEnqueueMessage(Message<T> message) {
+        // Default behavior, can be overridden in subclasses
+        return true;
     }
 
     /**
@@ -956,27 +883,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         listenerExecutor.execute(() -> 
receivedFuture.complete(interceptMessage));
     }
 
-    private void triggerZeroQueueSizeListener(final Message<T> message) {
-        checkArgument(conf.getReceiverQueueSize() == 0);
-        checkNotNull(listener, "listener can't be null");
-        checkNotNull(message, "unqueued message can't be null");
-
-        listenerExecutor.execute(() -> {
-            stats.updateNumMsgsReceived(message);
-            try {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}][{}] Calling message listener for unqueued 
message {}", topic, subscription,
-                            message.getMessageId());
-                }
-                listener.received(ConsumerImpl.this, message);
-            } catch (Throwable t) {
-                log.error("[{}][{}] Message listener error in processing 
unqueued message: {}", topic, subscription,
-                        message.getMessageId(), t);
-            }
-            increaseAvailablePermits(cnx());
-        });
-    }
-
     void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int 
redeliveryCount, ByteBuf uncompressedPayload,
             MessageIdData messageId, ClientCnx cnx) {
         int batchSize = msgMetadata.getNumMessagesInBatch();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 39c0511..ddd44d2 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.api.PulsarClientException.NotSupportedException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.util.ConsumerName;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
@@ -716,8 +717,10 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     partitionIndex -> {
                         String partitionName = 
TopicName.get(topicName).getPartition(partitionIndex).toString();
                         CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
-                        ConsumerImpl<T> newConsumer = new 
ConsumerImpl<>(client, partitionName, configurationData,
-                            client.externalExecutorProvider().getExecutor(), 
partitionIndex, subFuture, schema, interceptors);
+                        ConsumerImpl<T> newConsumer = 
ConsumerImpl.newConsumerImpl(client, partitionName,
+                                configurationData, 
client.externalExecutorProvider().getExecutor(),
+                                partitionIndex, subFuture,
+                                SubscriptionMode.Durable, null, schema, 
interceptors);
                         consumers.putIfAbsent(newConsumer.getTopic(), 
newConsumer);
                         return subFuture;
                     })
@@ -727,8 +730,9 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             allTopicPartitionsNumber.incrementAndGet();
 
             CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
-            ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client, 
topicName, internalConfig,
-                client.externalExecutorProvider().getExecutor(), 0, subFuture, 
schema, interceptors);
+            ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, 
topicName, internalConfig,
+                    client.externalExecutorProvider().getExecutor(), 0, 
subFuture, SubscriptionMode.Durable, null,
+                    schema, interceptors);
             consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
 
             futureList = Collections.singletonList(subFuture);
@@ -941,10 +945,10 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                         int partitionIndex = 
TopicName.getPartitionIndex(partitionName);
                         CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
                         ConsumerConfigurationData<T> configurationData = 
getInternalConsumerConfig();
-                        ConsumerImpl<T> newConsumer = new ConsumerImpl<>(
+                        ConsumerImpl<T> newConsumer = 
ConsumerImpl.newConsumerImpl(
                             client, partitionName, configurationData,
                             client.externalExecutorProvider().getExecutor(),
-                            partitionIndex, subFuture, schema, interceptors);
+                            partitionIndex, subFuture, 
SubscriptionMode.Durable, null, schema, interceptors);
                         consumers.putIfAbsent(newConsumer.getTopic(), 
newConsumer);
                         if (log.isDebugEnabled()) {
                             log.debug("[{}] create consumer {} for 
partitionName: {}",
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 63b4071..a224eb4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -332,8 +333,8 @@ public class PulsarClientImpl implements PulsarClient {
                 consumer = 
MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf,
                     listenerThread, consumerSubscribedFuture, 
metadata.partitions, schema, interceptors);
             } else {
-                consumer = new ConsumerImpl<>(PulsarClientImpl.this, topic, 
conf, listenerThread, -1,
-                        consumerSubscribedFuture, schema, interceptors);
+                consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, 
topic, conf, listenerThread, -1,
+                        consumerSubscribedFuture, SubscriptionMode.Durable, 
null, schema, interceptors);
             }
 
             synchronized (consumers) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
new file mode 100644
index 0000000..09b610f
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.String.format;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+
+@Slf4j
+public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
+
+    private final Lock zeroQueueLock = new ReentrantLock();
+
+    private volatile boolean waitingOnReceiveForZeroQueueSize = false;
+
+    public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf,
+            ExecutorService listenerExecutor, int partitionIndex, 
CompletableFuture<Consumer<T>> subscribeFuture,
+            SubscriptionMode subscriptionMode, MessageId startMessageId, 
Schema<T> schema,
+            ConsumerInterceptors<T> interceptors) {
+        super(client, topic, conf, listenerExecutor, partitionIndex, 
subscribeFuture, subscriptionMode, startMessageId,
+                schema, interceptors);
+    }
+
+    @Override
+    protected Message<T> internalReceive() throws PulsarClientException {
+        zeroQueueLock.lock();
+        try {
+            return fetchSingleMessageFromBroker();
+        } finally {
+            zeroQueueLock.unlock();
+        }
+    }
+
+    @Override
+    protected CompletableFuture<Message<T>> internalReceiveAsync() {
+        CompletableFuture<Message<T>> future = super.internalReceiveAsync();
+        if (!future.isDone()) {
+            // We expect the message to be not in the queue yet
+            sendFlowPermitsToBroker(cnx(), 1);
+        }
+
+        return future;
+    }
+
+    private Message<T> fetchSingleMessageFromBroker() throws 
PulsarClientException {
+        // Just being cautious
+        if (incomingMessages.size() > 0) {
+            log.error("The incoming message queue should never be greater than 
0 when Queue size is 0");
+            incomingMessages.clear();
+        }
+
+        Message<T> message;
+        try {
+            // if cnx is null or if the connection breaks the connectionOpened 
function will send the flow again
+            waitingOnReceiveForZeroQueueSize = true;
+            synchronized (this) {
+                if (isConnected()) {
+                    sendFlowPermitsToBroker(cnx(), 1);
+                }
+            }
+            do {
+                message = incomingMessages.take();
+                lastDequeuedMessage = message.getMessageId();
+                ClientCnx msgCnx = ((MessageImpl<?>) message).getCnx();
+                // synchronized need to prevent race between connectionOpened 
and the check "msgCnx == cnx()"
+                synchronized (this) {
+                    // if message received due to an old flow - discard it and 
wait for the message from the
+                    // latest flow command
+                    if (msgCnx == cnx()) {
+                        waitingOnReceiveForZeroQueueSize = false;
+                        break;
+                    }
+                }
+            } while (true);
+
+            stats.updateNumMsgsReceived(message);
+            return message;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            stats.incrementNumReceiveFailed();
+            throw new PulsarClientException(e);
+        } finally {
+            // Finally blocked is invoked in case the block on 
incomingMessages is interrupted
+            waitingOnReceiveForZeroQueueSize = false;
+            // Clearing the queue in case there was a race with messageReceived
+            incomingMessages.clear();
+        }
+    }
+
+    @Override
+    protected void consumerIsReconnectedToBroker(ClientCnx cnx, int 
currentQueueSize) {
+        super.consumerIsReconnectedToBroker(cnx, currentQueueSize);
+
+        // For zerosize queue : If the connection is reset and someone is 
waiting for the messages
+        // or queue was not empty: send a flow command
+        if (waitingOnReceiveForZeroQueueSize
+                || currentQueueSize > 0
+                || listener != null) {
+            sendFlowPermitsToBroker(cnx, 1);
+        }
+    }
+
+    @Override
+    protected boolean canEnqueueMessage(Message<T> message) {
+        if (listener != null) {
+            triggerZeroQueueSizeListener(message);
+            return false;
+        } else {
+            return true;
+        }
+    }
+
+    private void triggerZeroQueueSizeListener(final Message<T> message) {
+        checkNotNull(listener, "listener can't be null");
+        checkNotNull(message, "unqueued message can't be null");
+
+        listenerExecutor.execute(() -> {
+            stats.updateNumMsgsReceived(message);
+            try {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Calling message listener for unqueued 
message {}", topic, subscription,
+                            message.getMessageId());
+                }
+                listener.received(ZeroQueueConsumerImpl.this, message);
+            } catch (Throwable t) {
+                log.error("[{}][{}] Message listener error in processing 
unqueued message: {}", topic, subscription,
+                        message.getMessageId(), t);
+            }
+            increaseAvailablePermits(cnx());
+        });
+    }
+
+    @Override
+    protected void triggerListener(int numMessages) {
+        // Ignore since it was already triggered in the 
triggerZeroQueueSizeListener() call
+    }
+
+    @Override
+    void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int 
redeliveryCount,
+            ByteBuf uncompressedPayload, MessageIdData messageId, ClientCnx 
cnx) {
+        log.warn(
+                "Closing consumer [{}]-[{}] due to unsupported received 
batch-message with zero receiver queue size",
+                subscription, consumerName);
+        // close connection
+        closeAsync().handle((ok, e) -> {
+            // notify callback with failure result
+            notifyPendingReceivedCallback(null,
+                    new PulsarClientException.InvalidMessageException(
+                            format("Unsupported Batch message with 0 size 
receiver queue for [%s]-[%s] ",
+                                    subscription, consumerName)));
+            return null;
+        });
+    }
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 657b4f4..e09184c 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.testng.Assert;
@@ -56,8 +57,8 @@ public class ConsumerImplTest {
         when(client.getConfiguration()).thenReturn(clientConf);
 
         consumerConf.setSubscriptionName("test-sub");
-        consumer = new ConsumerImpl<ConsumerImpl>(client, topic, consumerConf,
-                executorService, -1, subscribeFuture, null, null);
+        consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf,
+                executorService, -1, subscribeFuture, 
SubscriptionMode.Durable, null, null, null);
     }
 
     @Test(invocationTimeOut = 1000)

Reply via email to