eolivelli commented on a change in pull request #12088:
URL: https://github.com/apache/pulsar/pull/12088#discussion_r721740084



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -996,6 +996,130 @@ void activeConsumerChanged(boolean isActive) {
         });
     }
 
+    protected boolean isBatch(MessageMetadata messageMetadata) {
+        // if message is not decryptable then it can't be parsed as a 
batch-message. so, add EncyrptionCtx to message
+        // and return undecrypted payload
+        return !isMessageUndecryptable(messageMetadata) &&
+                (messageMetadata.hasNumMessagesInBatch() || 
messageMetadata.getNumMessagesInBatch() != 1);
+    }
+
+    protected <U> MessageImpl<U> newSingleMessage(final int index,
+                                                  final int numMessages,
+                                                  final BrokerEntryMetadata 
brokerEntryMetadata,
+                                                  final MessageMetadata 
msgMetadata,
+                                                  final SingleMessageMetadata 
singleMessageMetadata,
+                                                  final ByteBuf payload,
+                                                  final MessageIdImpl 
messageId,
+                                                  final Schema<U> schema,
+                                                  final boolean 
containMetadata,
+                                                  final BitSetRecyclable 
ackBitSet,
+                                                  final BatchMessageAcker 
acker,
+                                                  final int redeliveryCount) {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] [{}] processing message num - {} in batch", 
subscription, consumerName, index);
+        }
+
+        ByteBuf singleMessagePayload = null;
+        try {
+            if (containMetadata) {
+                singleMessagePayload =
+                        Commands.deSerializeSingleMessageInBatch(payload, 
singleMessageMetadata, index, numMessages);
+            }
+
+            if (isSameEntry(messageId) && isPriorBatchIndex(index)) {
+                // If we are receiving a batch message, we need to discard 
messages that were prior
+                // to the startMessageId
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] [{}] Ignoring message from before the 
startMessageId: {}", subscription,
+                            consumerName, startMessageId);
+                }
+                return null;
+            }
+
+            if (singleMessageMetadata != null && 
singleMessageMetadata.isCompactedOut()) {
+                // message has been compacted out, so don't send to the user
+                return null;
+            }
+
+            if (ackBitSet != null && !ackBitSet.get(index)) {
+                return null;
+            }
+
+            BatchMessageIdImpl batchMessageIdImpl = new 
BatchMessageIdImpl(messageId.getLedgerId(),
+                    messageId.getEntryId(), getPartitionIndex(), index, 
numMessages, acker);
+
+            final ByteBuf payloadBuffer = (singleMessagePayload != null) ? 
singleMessagePayload : payload;
+            return MessageImpl.create(topicName.toString(), batchMessageIdImpl,
+                    msgMetadata, singleMessageMetadata, payloadBuffer,
+                    createEncryptionContext(msgMetadata), cnx(), schema, 
redeliveryCount, poolMessages
+            ).setBrokerEntryMetadata(brokerEntryMetadata);
+        } catch (IOException | IllegalStateException e) {
+            throw new IllegalStateException(e);
+        } finally {
+            if (singleMessagePayload != null) {
+                singleMessagePayload.release();
+            }
+        }
+    }
+
+    protected <U> MessageImpl<U> newMessage(final MessageIdImpl messageId,
+                                            final BrokerEntryMetadata 
brokerEntryMetadata,
+                                            final MessageMetadata 
messageMetadata,
+                                            final ByteBuf payload,
+                                            final Schema<U> schema,
+                                            final int redeliveryCount) {
+        return MessageImpl.create(topicName.toString(), messageId, 
messageMetadata, payload,
+                createEncryptionContext(messageMetadata), cnx(), schema, 
redeliveryCount, poolMessages
+        ).setBrokerEntryMetadata(brokerEntryMetadata);
+    }
+
+    private void executeNotifyCallback(final MessageImpl<T> message) {
+        // Enqueue the message so that it can be retrieved when application 
calls receive()
+        // if the conf.getReceiverQueueSize() is 0 then discard message if no 
one is waiting for it.
+        // if asyncReceive is waiting then notify callback without adding to 
incomingMessages queue
+        internalPinnedExecutor.execute(() -> {
+            if (hasNextPendingReceive()) {
+                notifyPendingReceivedCallback(message, null);
+            } else if (enqueueMessageAndCheckBatchReceive(message) && 
hasPendingBatchReceive()) {
+                notifyPendingBatchReceivedCallBack();
+            }
+        });
+    }
+
+    private void processPayloadByProcessor(final BrokerEntryMetadata 
brokerEntryMetadata,
+                                           final MessageMetadata 
messageMetadata,
+                                           final ByteBuf byteBuf,
+                                           final MessageIdImpl messageId,
+                                           final Schema<T> schema,
+                                           final int redeliveryCount,
+                                           final List<Long> ackSet) {
+        final MessagePayloadImpl payload = MessagePayloadImpl.create(byteBuf);
+        final MessagePayloadContextImpl entryContext = 
MessagePayloadContextImpl.get(
+                brokerEntryMetadata, messageMetadata, messageId, this, 
redeliveryCount, ackSet);
+        final AtomicInteger skippedMessages = new AtomicInteger(0);
+        try {
+            conf.getPayloadProcessor().process(payload, entryContext, schema, 
message -> {
+                if (message == null) {
+                    skippedMessages.incrementAndGet();
+                }
+                executeNotifyCallback((MessageImpl<T>) message);
+            });

Review comment:
       Here `message` can be null.
   Should we skip this call?

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
##########
@@ -625,8 +625,9 @@ public BrokerEntryMetadata getBrokerEntryMetadata() {
         return brokerEntryMetadata;
     }
 
-    public void setBrokerEntryMetadata(BrokerEntryMetadata 
brokerEntryMetadata) {
+    public MessageImpl<T> setBrokerEntryMetadata(BrokerEntryMetadata 
brokerEntryMetadata) {

Review comment:
       Do we really need to change this method?

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchFormat.java
##########
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.processor;
+
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * A batch message whose format is customized.
+ *
+ * 1. First 2 bytes represent the number of messages.
+ * 2. Each message is a string, whose format is
+ *   1. First 2 bytes represent the length `N`.
+ *   2. Followed N bytes are the bytes of the string.
+ */
+public class CustomBatchFormat {
+
+    public static final String KEY = "entry.format";
+    public static final String VALUE = "custom";
+
+    @AllArgsConstructor
+    @Getter
+    public static class Metadata {
+        private final int numMessages;
+    }
+
+    public static ByteBuf serialize(Iterable<String> strings) {
+        final ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(1024);
+        buf.writeShort(0);
+        short numMessages = 0;
+        for (String s : strings) {
+            writeString(buf, s);
+            numMessages++;
+        }
+        buf.setShort(0, numMessages);
+        return buf;
+    }
+
+    private static void writeString(final ByteBuf buf, final String s) {
+        final byte[] bytes = Schema.STRING.encode(s);
+        buf.writeShort(bytes.length);
+        buf.writeBytes(bytes);
+    }
+
+    public static Metadata readMetadata(final ByteBuf buf) {
+        return new Metadata(buf.readShort());
+    }
+
+    public static byte[] readMessage(final ByteBuf buf) {
+        final short length = buf.readShort();
+        final byte[] bytes = new byte[length];
+        buf.readBytes(bytes);
+        return bytes;
+    }
+
+    @Test
+    public void testMultipleStrings() {

Review comment:
       Will surefire pick up this test even if the class name does not end with 
Test?
   
   This usually doesn't work. You can run the test in the IDE but probably it 
won't run on CI

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadImpl.java
##########
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.Recycler;
+import io.netty.util.ReferenceCountUtil;
+import lombok.Getter;
+import lombok.NonNull;
+import org.apache.pulsar.client.api.MessagePayload;
+
+/**
+ * A wrapper of {@link ByteBuf} that implements {@link MessagePayload}.
+ */
+public class MessagePayloadImpl implements MessagePayload {
+
+    private static final Recycler<MessagePayloadImpl> RECYCLER = new 
Recycler<MessagePayloadImpl>() {
+        @Override
+        protected MessagePayloadImpl newObject(Handle<MessagePayloadImpl> 
handle) {
+            return new MessagePayloadImpl(handle);
+        }
+    };
+    private static final byte[] EMPTY_BYTES = new byte[0];
+
+    private final Recycler.Handle<MessagePayloadImpl> recyclerHandle;
+    @Getter
+    private ByteBuf byteBuf;
+
+    public static MessagePayloadImpl create(@NonNull final ByteBuf byteBuf) {
+        final MessagePayloadImpl payload = RECYCLER.get();
+        payload.byteBuf = byteBuf;
+        return payload;
+    }
+
+    private MessagePayloadImpl(final Recycler.Handle<MessagePayloadImpl> 
handle) {
+        this.recyclerHandle = handle;
+    }
+
+    @Override
+    public void release() {
+        ReferenceCountUtil.release(byteBuf);
+        byteBuf = null;
+        recyclerHandle.recycle(this);
+    }
+
+    @Override
+    public byte[] copiedBuffer() {
+        final int readable = byteBuf.readableBytes();
+        if (readable > 0) {
+            final byte[] bytes = new byte[readable];
+            byteBuf.getBytes(byteBuf.readerIndex(), bytes);
+            return bytes;
+        } else {
+            return EMPTY_BYTES;

Review comment:
       This case is unlikely to happen.
   It may be confusing as we expect that here we return a copied array but for 
empty arrays we are returning exactly the same reference.
   I suggest to return a new empty array




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to