AnonHxy opened a new issue, #16680:
URL: https://github.com/apache/pulsar/issues/16680

   
   ## Motivation
   
   Before We already haved  a plug-in way to filter entries in broker, aka 
PIP-105 https://github.com/apache/pulsar/issues/12269.  But this way only 
checks at the batch header,  without digging into the individual messages 
properties. Of course it provides a interface to deserialize the entire Entry 
to the heap,  But this will bring some memory and cpu workload. And in most 
scenario we only need partial of the properties to do some filter. 
   
   This proposal brings a method to make PIP-105 support batched entry without 
having to deserialize the entire Entry to the heap
   
   
   ## API Changes
   
   - Add a producer config to specialize the key, of which properties will be 
added to the batched entry metadata, for example:
   
   ```
   
org.apache.pulsar.client.impl.conf.ProducerConfigurationData#batchedFilterProperties
   
   ```
   The  `batchedFilterProperties` type is `List<String>` with default value is 
emtpy list.  For empty list, it means that the properties of entry's metadata 
is empty, and the `EntryFilter` will not take effect.
   
   ## Implementation
   
   - When call `org.apache.pulsar.client.impl.BatchMessageContainerImpl#add`,  
we extract the message properties and add it to `metadata`:
   ```
    public boolean add(MessageImpl<?> msg, SendCallback callback) {
   
           if (log.isDebugEnabled()) {
               log.debug("[{}] [{}] add message to batch, num messages in batch 
so far {}", topicName, producerName,
                       numMessagesInBatch);
           }
   
           if (++numMessagesInBatch == 1) {
               try {
                   // some properties are common amongst the different messages 
in the batch, hence we just pick it up from
                   // the first message
                   messageMetadata.setSequenceId(msg.getSequenceId());
                   List<KeyValue> filterProperties = getProperties(msg);
                   if (!filterProperties.isEmpty()) {
                       messageMetadata.addAllProperties(filterProperties);  // 
and message properties here
                   }
   ```
   
   -  Also we need add a method `hasSameProperties` like `hasSameSchema`.  
Messages with same properties can be added to the same batch:
   
   ```
    private boolean canAddToCurrentBatch(MessageImpl<?> msg) {
           return batchMessageContainer.haveEnoughSpace(msg)
                  && (!isMultiSchemaEnabled(false)
                       || batchMessageContainer.hasSameSchema(msg)
                       || batchMessageContainer.hasSameProperties(msg))
                   && batchMessageContainer.hasSameTxn(msg);
       }
   
   ```
   
   
   ## Reject Alternatives
   
   - Implement a `AbstractBatchMessageContainer` ,  saying 
`BatchMessagePropertiesBasedContainer`, keeping messages with same properties 
in a single `hashmap` entry,  like `BatchMessageKeyBasedContainer`.
   
   Rejection reason:  This will publish messages out of order 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to