Hi Eno,

I'm not sure.  My understanding is that the cache would prevent two
immediate updates for the same key from being forwarded, but that only
applies when records arrive within commit.interval.ms of each other.  Is
that understanding correct?

filterRedundant compares the newValue & oldValue in a Change to work
regardless of the time between records.

https://github.com/apache/kafka/compare/trunk...mfenniak:filter-redundant


The use-case that is currently kicking me is a piece of source data that
contains multiple unrelated configuration fields in a single record; it's
not a great design, but it's the data I have to work with.  I'm plucking
out only a single relevant field with mapValues, but changes to the other
fields within the record are causing excessive, expensive recomputations
that are redundant.

Mathieu


On Sun, Dec 4, 2016 at 4:34 AM, Eno Thereska <eno.there...@gmail.com> wrote:

> Hi Mathieu,
>
> Thanks for the suggestion. Wouldn't the cache introduced in KIP-63 do some
> of this for you, in that it dedups records with the same key and prevents
> them from being forwarded downstream?
>
> Eno
> > On 4 Dec 2016, at 04:13, Mathieu Fenniak <mathieu.fenn...@replicon.com>
> wrote:
> >
> > Hey all,
> >
> > I'd like to contribute a new KTable API that would allow for the
> > suppression of redundant KTable forwards, and I'd like to solicit
> feedback
> > before I put together a patch.
> >
> > A typical use-case of this API would be that you're using mapValues to
> > pluck a subset of data out of a topic, but you'd like changes to the
> record
> > value that don't modify the output of mapValues to not cause output that
> > trigger expensive and redundant recalculations.
> >
> > For example, topic "user" contains key:1, value:{"firstName": "Jack",
> > "lastName": "Brown"}.  builder.topic("user").mapValues((user) ->
> > user.get("lastName"))  will create a KTable that would forward updates
> from
> > the user topic even if lastName never changed.
> >
> > My proposed API would be to add a filterRedundant method to KTable; one
> > override takes a Comparator<V> to provide a custom comparison for
> > evaluating whether a change is redundant, and one parameterless override
> > would use a comparator backed by the object's equals() method.
> >
> >    /**
> >     * Creates a new instance of {@link KTable} that filters out redundant
> > updates and prevents "non-updates" from
> >     * propagating to further operations on the returned table.  A
> > redundant update onewhere the same value is provided
> >     * more than once for a given key.  Object.equals is used to compare
> > whether a subsequent update has the same value.
> >
> >     * @return a {@link KTable} that contains the same values as this
> > table, but suppresses redundant updates
> >     */
> >    KTable<K, V> filterRedundant();
> >
> >    /**
> >     * Creates a new instance of {@link KTable} that filters out redundant
> > updates and prevents "non-updates" from
> >     * propagating to further operations on the returned table.  A
> > redundant update onewhere the same value is provided
> >     * more than once for a given key.  A user-provided comparator is used
> > to compare whether a subsequent update has
> >     * the same value.
> >
> >     * @return a {@link KTable} that contains the same values as this
> > table, but suppresses redundant updates
> >     */
> >    KTable<K, V> filterRedundant(Comparator<V> comparator);
>
>

Reply via email to