Hi Enrico

Thanks for your replying.

> 1) It is not clear to me if you want to push all the messages metadata
in the main header of the entry or only the metatadata of the first
entry, or only the KEY.

I just want to copy the partial message properties(we call it batch
properties here) to the main header of the entry, without modifying the
payload or single message metadata.  All messages in one batch must have
the same batch properties (both key and value).

>  2) we have to clarify the new message format of the entry and update
the protocol documents

It seems that the protocol documents has not described the batch message
main header. It just describes the `SingleMessageMetadata`. I agree with
that It's better to update the protocol documents to describe the batch
message main header.

> 3) existing EntryFilters won't be able to work well with the new
format, we must find a way to make them fail and not process garbage

It seems that there is no compatibility problems. Because I just add
properties to the main header , which is empty before,  and there is no
modification for the payload and single message metadata. The existing
EntryFilters can be able to work well with the new
format, the same as the existing  Protocol Handlers. :)

Thanks
Xiaoyu Hou

Enrico Olivelli <eolive...@gmail.com> 于2022年7月21日周四 20:27写道:

> Thank you for this proposal !
>
> I understand the problem, and I have already thought about it, because
> I am the author of some filters (especially the JMS Selectors Filter)
> but we have to clarify more about this PIP.
>
> 1) It is not clear to me if you want to push all the messages metadata
> in the main header of the entry or only the metatadata of the first
> entry, or only the KEY.
> 2) we have to clarify the new message format of the entry and update
> the protocol documents
> 3) existing EntryFilters won't be able to work well with the new
> format, we must find a way to make them fail and not process garbage
> 4) the same problem applies to Interceptors and Protocol Handlers
> (like KOP), we must make it clear in this PIP what is the upgrade path
> and give some suggestions to the developers of such components
>
> I am supporting this initiative, as far as we clarify those points.
>
> The Pulsar ecosystem is becoming more and more mature, and the user
> base is growing quickly, we can't break things without a clear path
> for the users and for the developers of extensions
>
> Enrico
>
>
> Il giorno gio 21 lug 2022 alle ore 12:45 Qiang Huang
> <qiang.huang1...@gmail.com> ha scritto:
> >
> > Good. +1
> >
> > Anon Hxy <anonhx...@gmail.com> 于2022年7月19日周二 18:00写道:
> >
> > > Hi Pulsar community:
> > >
> > > I open a pip to discuss "Support batched message using entry filter"
> > >
> > > Proposal Link: https://github.com/apache/pulsar/issues/16680
> > > ---
> > >
> > > ## Motivation
> > >
> > > We already have  a plug-in way to filter entries in broker, aka PIP-105
> > > https://github.com/apache/pulsar/issues/12269.  But this way only
> checks
> > > at
> > > the batch header,  without digging into the individual messages
> properties.
> > > Of course it provides an interface to deserialize the entire Entry to
> the
> > > heap,  But this will bring some memory and cpu workload. And in most
> > > scenarios we only need partial properties to do some filter.
> > >
> > > This proposal brings a method to make PIP-105 support batched entry
> without
> > > having to deserialize the entire Entry to the heap
> > >
> > >
> > > ## API Changes
> > >
> > > - Add a producer config to specialize the key, of which properties
> will be
> > > added to the batched entry metadata, for example:
> > >
> > > ```
> > >
> > >
> org.apache.pulsar.client.impl.conf.ProducerConfigurationData#batchedFilterProperties
> > >
> > > ```
> > > The  `batchedFilterProperties` type is `List<String>` with default
> value is
> > > empty list.  For an empty list, it means that the properties of entry's
> > > metadata are empty, and the `EntryFilter` will not take effect.
> > >
> > > ## Implementation
> > >
> > > - When call
> `org.apache.pulsar.client.impl.BatchMessageContainerImpl#add`,
> > >  we extract the message properties and add it to `metadata`:
> > > ```
> > >  public boolean add(MessageImpl<?> msg, SendCallback callback) {
> > >
> > >         if (log.isDebugEnabled()) {
> > >             log.debug("[{}] [{}] add message to batch, num messages in
> > > batch so far {}", topicName, producerName,
> > >                     numMessagesInBatch);
> > >         }
> > >
> > >         if (++numMessagesInBatch == 1) {
> > >             try {
> > >                 // some properties are common amongst the different
> > > messages in the batch, hence we just pick it up from
> > >                 // the first message
> > >                 messageMetadata.setSequenceId(msg.getSequenceId());
> > >                 List<KeyValue> filterProperties = getProperties(msg);
> > >                 if (!filterProperties.isEmpty()) {
> > >
>  messageMetadata.addAllProperties(filterProperties);  //
> > > and message properties here
> > >                 }
> > > ```
> > >
> > > -  Also we need to add a method `hasSameProperties` like
> `hasSameSchema`.
> > > Messages with same properties can be added to the same batch:
> > >
> > > ```
> > >  private boolean canAddToCurrentBatch(MessageImpl<?> msg) {
> > >         return batchMessageContainer.haveEnoughSpace(msg)
> > >                && (!isMultiSchemaEnabled(false)
> > >                     || batchMessageContainer.hasSameSchema(msg)
> > >                     || batchMessageContainer.hasSameProperties(msg))
> > >                 && batchMessageContainer.hasSameTxn(msg);
> > >     }
> > >
> > > ```
> > >
> > >
> > > ## Reject Alternatives
> > >
> > > - Implement a `AbstractBatchMessageContainer` ,  saying
> > > `BatchMessagePropertiesBasedContainer`, keeping messages with same
> > > properties in a single `hashmap` entry,  like
> > > `BatchMessageKeyBasedContainer`.
> > >
> > > Rejection reason:  This will publish messages out of order
> > >
> > >
> > >
> > > Thanks,
> > > Xiaoyu Hou
> > >
> >
> >
> > --
> > BR,
> > Qiang Huang
>

Reply via email to