eolivelli commented on a change in pull request #12088: URL: https://github.com/apache/pulsar/pull/12088#discussion_r721740084
########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ########## @@ -996,6 +996,130 @@ void activeConsumerChanged(boolean isActive) { }); } + protected boolean isBatch(MessageMetadata messageMetadata) { + // if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message + // and return undecrypted payload + return !isMessageUndecryptable(messageMetadata) && + (messageMetadata.hasNumMessagesInBatch() || messageMetadata.getNumMessagesInBatch() != 1); + } + + protected <U> MessageImpl<U> newSingleMessage(final int index, + final int numMessages, + final BrokerEntryMetadata brokerEntryMetadata, + final MessageMetadata msgMetadata, + final SingleMessageMetadata singleMessageMetadata, + final ByteBuf payload, + final MessageIdImpl messageId, + final Schema<U> schema, + final boolean containMetadata, + final BitSetRecyclable ackBitSet, + final BatchMessageAcker acker, + final int redeliveryCount) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] processing message num - {} in batch", subscription, consumerName, index); + } + + ByteBuf singleMessagePayload = null; + try { + if (containMetadata) { + singleMessagePayload = + Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, index, numMessages); + } + + if (isSameEntry(messageId) && isPriorBatchIndex(index)) { + // If we are receiving a batch message, we need to discard messages that were prior + // to the startMessageId + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription, + consumerName, startMessageId); + } + return null; + } + + if (singleMessageMetadata != null && singleMessageMetadata.isCompactedOut()) { + // message has been compacted out, so don't send to the user + return null; + } + + if (ackBitSet != null && !ackBitSet.get(index)) { + return null; + } + + BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(), + messageId.getEntryId(), getPartitionIndex(), index, numMessages, acker); + + final ByteBuf payloadBuffer = (singleMessagePayload != null) ? singleMessagePayload : payload; + return MessageImpl.create(topicName.toString(), batchMessageIdImpl, + msgMetadata, singleMessageMetadata, payloadBuffer, + createEncryptionContext(msgMetadata), cnx(), schema, redeliveryCount, poolMessages + ).setBrokerEntryMetadata(brokerEntryMetadata); + } catch (IOException | IllegalStateException e) { + throw new IllegalStateException(e); + } finally { + if (singleMessagePayload != null) { + singleMessagePayload.release(); + } + } + } + + protected <U> MessageImpl<U> newMessage(final MessageIdImpl messageId, + final BrokerEntryMetadata brokerEntryMetadata, + final MessageMetadata messageMetadata, + final ByteBuf payload, + final Schema<U> schema, + final int redeliveryCount) { + return MessageImpl.create(topicName.toString(), messageId, messageMetadata, payload, + createEncryptionContext(messageMetadata), cnx(), schema, redeliveryCount, poolMessages + ).setBrokerEntryMetadata(brokerEntryMetadata); + } + + private void executeNotifyCallback(final MessageImpl<T> message) { + // Enqueue the message so that it can be retrieved when application calls receive() + // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it. + // if asyncReceive is waiting then notify callback without adding to incomingMessages queue + internalPinnedExecutor.execute(() -> { + if (hasNextPendingReceive()) { + notifyPendingReceivedCallback(message, null); + } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) { + notifyPendingBatchReceivedCallBack(); + } + }); + } + + private void processPayloadByProcessor(final BrokerEntryMetadata brokerEntryMetadata, + final MessageMetadata messageMetadata, + final ByteBuf byteBuf, + final MessageIdImpl messageId, + final Schema<T> schema, + final int redeliveryCount, + final List<Long> ackSet) { + final MessagePayloadImpl payload = MessagePayloadImpl.create(byteBuf); + final MessagePayloadContextImpl entryContext = MessagePayloadContextImpl.get( + brokerEntryMetadata, messageMetadata, messageId, this, redeliveryCount, ackSet); + final AtomicInteger skippedMessages = new AtomicInteger(0); + try { + conf.getPayloadProcessor().process(payload, entryContext, schema, message -> { + if (message == null) { + skippedMessages.incrementAndGet(); + } + executeNotifyCallback((MessageImpl<T>) message); + }); Review comment: Here `message` can be null. Should we skip this call? ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java ########## @@ -625,8 +625,9 @@ public BrokerEntryMetadata getBrokerEntryMetadata() { return brokerEntryMetadata; } - public void setBrokerEntryMetadata(BrokerEntryMetadata brokerEntryMetadata) { + public MessageImpl<T> setBrokerEntryMetadata(BrokerEntryMetadata brokerEntryMetadata) { Review comment: Do we really need to change this method? ########## File path: pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchFormat.java ########## @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.processor; + +import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * A batch message whose format is customized. + * + * 1. First 2 bytes represent the number of messages. + * 2. Each message is a string, whose format is + * 1. First 2 bytes represent the length `N`. + * 2. Followed N bytes are the bytes of the string. + */ +public class CustomBatchFormat { + + public static final String KEY = "entry.format"; + public static final String VALUE = "custom"; + + @AllArgsConstructor + @Getter + public static class Metadata { + private final int numMessages; + } + + public static ByteBuf serialize(Iterable<String> strings) { + final ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(1024); + buf.writeShort(0); + short numMessages = 0; + for (String s : strings) { + writeString(buf, s); + numMessages++; + } + buf.setShort(0, numMessages); + return buf; + } + + private static void writeString(final ByteBuf buf, final String s) { + final byte[] bytes = Schema.STRING.encode(s); + buf.writeShort(bytes.length); + buf.writeBytes(bytes); + } + + public static Metadata readMetadata(final ByteBuf buf) { + return new Metadata(buf.readShort()); + } + + public static byte[] readMessage(final ByteBuf buf) { + final short length = buf.readShort(); + final byte[] bytes = new byte[length]; + buf.readBytes(bytes); + return bytes; + } + + @Test + public void testMultipleStrings() { Review comment: Will surefire pick up this test even if the class name does not end with Test? This usually doesn't work. You can run the test in the IDE but probably it won't run on CI ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadImpl.java ########## @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import io.netty.buffer.ByteBuf; +import io.netty.util.Recycler; +import io.netty.util.ReferenceCountUtil; +import lombok.Getter; +import lombok.NonNull; +import org.apache.pulsar.client.api.MessagePayload; + +/** + * A wrapper of {@link ByteBuf} that implements {@link MessagePayload}. + */ +public class MessagePayloadImpl implements MessagePayload { + + private static final Recycler<MessagePayloadImpl> RECYCLER = new Recycler<MessagePayloadImpl>() { + @Override + protected MessagePayloadImpl newObject(Handle<MessagePayloadImpl> handle) { + return new MessagePayloadImpl(handle); + } + }; + private static final byte[] EMPTY_BYTES = new byte[0]; + + private final Recycler.Handle<MessagePayloadImpl> recyclerHandle; + @Getter + private ByteBuf byteBuf; + + public static MessagePayloadImpl create(@NonNull final ByteBuf byteBuf) { + final MessagePayloadImpl payload = RECYCLER.get(); + payload.byteBuf = byteBuf; + return payload; + } + + private MessagePayloadImpl(final Recycler.Handle<MessagePayloadImpl> handle) { + this.recyclerHandle = handle; + } + + @Override + public void release() { + ReferenceCountUtil.release(byteBuf); + byteBuf = null; + recyclerHandle.recycle(this); + } + + @Override + public byte[] copiedBuffer() { + final int readable = byteBuf.readableBytes(); + if (readable > 0) { + final byte[] bytes = new byte[readable]; + byteBuf.getBytes(byteBuf.readerIndex(), bytes); + return bytes; + } else { + return EMPTY_BYTES; Review comment: This case is unlikely to happen. It may be confusing as we expect that here we return a copied array but for empty arrays we are returning exactly the same reference. I suggest to return a new empty array -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org