Good. +1

Anon Hxy <anonhx...@gmail.com> 于2022年7月19日周二 18:00写道:

> Hi Pulsar community:
>
> I open a pip to discuss "Support batched message using entry filter"
>
> Proposal Link: https://github.com/apache/pulsar/issues/16680
> ---
>
> ## Motivation
>
> We already have  a plug-in way to filter entries in broker, aka PIP-105
> https://github.com/apache/pulsar/issues/12269.  But this way only checks
> at
> the batch header,  without digging into the individual messages properties.
> Of course it provides an interface to deserialize the entire Entry to the
> heap,  But this will bring some memory and cpu workload. And in most
> scenarios we only need partial properties to do some filter.
>
> This proposal brings a method to make PIP-105 support batched entry without
> having to deserialize the entire Entry to the heap
>
>
> ## API Changes
>
> - Add a producer config to specialize the key, of which properties will be
> added to the batched entry metadata, for example:
>
> ```
>
> org.apache.pulsar.client.impl.conf.ProducerConfigurationData#batchedFilterProperties
>
> ```
> The  `batchedFilterProperties` type is `List<String>` with default value is
> empty list.  For an empty list, it means that the properties of entry's
> metadata are empty, and the `EntryFilter` will not take effect.
>
> ## Implementation
>
> - When call `org.apache.pulsar.client.impl.BatchMessageContainerImpl#add`,
>  we extract the message properties and add it to `metadata`:
> ```
>  public boolean add(MessageImpl<?> msg, SendCallback callback) {
>
>         if (log.isDebugEnabled()) {
>             log.debug("[{}] [{}] add message to batch, num messages in
> batch so far {}", topicName, producerName,
>                     numMessagesInBatch);
>         }
>
>         if (++numMessagesInBatch == 1) {
>             try {
>                 // some properties are common amongst the different
> messages in the batch, hence we just pick it up from
>                 // the first message
>                 messageMetadata.setSequenceId(msg.getSequenceId());
>                 List<KeyValue> filterProperties = getProperties(msg);
>                 if (!filterProperties.isEmpty()) {
>                     messageMetadata.addAllProperties(filterProperties);  //
> and message properties here
>                 }
> ```
>
> -  Also we need to add a method `hasSameProperties` like `hasSameSchema`.
> Messages with same properties can be added to the same batch:
>
> ```
>  private boolean canAddToCurrentBatch(MessageImpl<?> msg) {
>         return batchMessageContainer.haveEnoughSpace(msg)
>                && (!isMultiSchemaEnabled(false)
>                     || batchMessageContainer.hasSameSchema(msg)
>                     || batchMessageContainer.hasSameProperties(msg))
>                 && batchMessageContainer.hasSameTxn(msg);
>     }
>
> ```
>
>
> ## Reject Alternatives
>
> - Implement a `AbstractBatchMessageContainer` ,  saying
> `BatchMessagePropertiesBasedContainer`, keeping messages with same
> properties in a single `hashmap` entry,  like
> `BatchMessageKeyBasedContainer`.
>
> Rejection reason:  This will publish messages out of order
>
>
>
> Thanks,
> Xiaoyu Hou
>


-- 
BR,
Qiang Huang

Reply via email to