Before I dive in to the question of IQ and the approaches you proposed, can
you just
elaborate on the problem itself? By definition, the `markAsPartitioned`
flag means that
a repartition would be a no-op, ie that the stream (and its partitioning)
would be the same
whether or not a repartition is inserted. For this to be true, it then has
to be the case that

Partitioner.partition(key) == Partitioner.partition(map(key))

The left-hand side of the above is precisely how we determine the partition
number that
a key belongs to when using IQ. It shouldn't matter whether the user is
querying a key
in a store upstream of the key-changing operation or a mapped key
downstream of it
-- either way we just apply the given Partitioner.

See StreamsMetadataState#getKeyQueryMetadataForKey
<https://github.com/apache/kafka/blob/6854eb8332d6ef1f1c6216d2f67d6e146b1ef60f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java#L283>
for where this happens


If we're concerned that users might try to abuse the new
`markAsPartitioned` feature,
or accidentally misuse it, then we could add a runtime check that applies
the Partitioner
associated with that subtopology to the key being processed and the mapped
key result
to assert that they do indeed match. Imo this is probably overkill, just
putting it out there.

On Sat, Aug 7, 2021 at 1:42 PM Ivan Ponomarev <iponoma...@mail.ru.invalid>
wrote:

> Hi Sophie,
>
> thanks for your reply! So your proposal is:
>
> 1). For each key-changing operation, deprecate the existing overloads
> that accept a Named, and replace them with overloads that take an
> operator-specific config object.
> 2). Add `markAsPartitioned` flag to these configs.
>
> IMO, this looks much better than the original proposal, I like it very
> much and I think I will rewrite the KIP soon. I absolutely agree with
> your points. Repartition logic is not a part of the public contract, and
> it's much better to give it correct hints instead of telling explicitly
> what it should do.
>
> ...
>
> Since we're generating such bright ideas, maybe we should also
> brainstorm the interactive query problem?
>
> The problem is that interactive queries will not work properly when
> `markAsPartitioned` is used. Although original key and mapped key will
> be in the same partition, we will no longer be able to guess this
> partition given the mapped key only.
>
> The possible approaches are:
>
> 1) Give up and don't use interactive queries together with
> `markAsPartitioned`. This is what I suppose now. But can we do better?
>
> 2) Maybe we should ask the user to provide 'reverse mapping' that will
> allow IQ to restore the original key in order to choose the correct
> partition. We can place this mapping in our new configuration object. Of
> course, there is no way for KStreams to verify in compile time/startup
> time that the this function is actually the reverse mapping that extract
> the old key from the new one. Users will forget to provide this
> function. Users will provide wrong functions. This all looks too fragile.
>
> 3) Maybe there can be a completely different approach. Let's introduce a
> new entity -- composite keys, consisting of "head" and "tail". The
> partition for the composite key is calculated based on its 'head' value
> only. If we provide a key mapping in form key -> CompositeKey(key,
> tail), then it's obvious that we do not need a repartition. When an
> interactive query needs to guess the partition for CompositeKey, it just
> extracts its head and calculates the correct partition.
>
> We can select CompositeKey before groupByKey() and aggregation
> operations, and this will not involve repartition. And IQ will work.
>
> Is it too daring idea, WDYT? My concern: will it cover all the cases
> when we want to choose a different key, but also avoid repartition?
>
> Regards,
>
> Ivan
>
>
>
> 06.08.2021 23:19, Sophie Blee-Goldman пишет:
> > Hey Ivan
> >
> > I completely agree that adding it as a config to Grouped/Joined/etc isn't
> > much better, I was just
> > listing it for completeness, and that I would prefer to make it a
> > configuration of the key-changing
> > operation itself -- that's what I meant by
> >
> > a better alternative might be to introduce this ... to the config object
> of
> >> the operator that's actually
> >
> > doing the key changing operation
> >
> >
> > I personally believe this is the semantically "correct" way to approach
> > this, since "preserves partitioning"
> > or "does not preserve partitioning" is a property of a key-changing
> > operation and not an operation on the
> > stream itself. Also, this way the user need only tell Streams which
> > operations do or do not preserve the
> > partitioning, and Streams can figure out where to insert a repartition in
> > the topology as it does today.
> >
> > Otherwise, we're rendering this particularly useful feature of the DSL --
> > automatic repartitioning -- pretty
> > much useless, since the user now has to figure out whether a repartition
> is
> > needed. On top of that, they
> > need to have some understanding of where and when this internal automatic
> > repartitioning logic is going
> > to insert that repartition in order to cancel it in the appropriate
> place.
> > Which is pretty unfortunate, since
> > that logic is not part of the public contract: it can change at any time,
> > for example as it did when we introduced
> > the repartition merging optimization.
> >
> > All that said, those are valid concerns regarding the expansion of the
> > API's surface area. Since none of
> > the key-changing operations currently have a config object like some
> other
> > operations (for example Grouped
> > or Consumed, etc), this would double the number of overloads. But maybe
> > this is a good opportunity to fix
> > that problem, rather than keep digging ourselves into holes by trying to
> > work around it.
> >
> > It looks like all of those key-changing operations have two overloads at
> > the moment, one with no parameters
> > beyond the operation itself (eg KeyValueMapper for #selectKey) and the
> > other with an additional Named
> > parameter, which is itself another kind of configuration. What if we
> > instead deprecate the existing overloads
> > that accept a Named, and replace them with overloads that take an
> > operator-specific config object like we do
> > elsewhere (eg Grouped for #groupByKey). Then we can have both Named and
> > this  `markAsPartitioned` flag
> > be part of the general config object, which (a) does not expand the API
> > surface area at all in this KIP, and (b)
> > also protects future KIPs from needing to have this same conversation
> over
> > and over, because we can now
> > stick any additional operator properties into that same config object.
> >
> > WDYT? By the way, the above idea (introducing a single config object to
> > wrap all operator properties) was also
> > raised by John Roesler a while back. Let's hope he hasn't changed his
> mind
> > since then :)
> >
> >
> > On Fri, Aug 6, 2021 at 3:01 AM Ivan Ponomarev <iponoma...@mail.ru.invalid
> >
> > wrote:
> >
> >> Hi Matthias,
> >>
> >> Concerning the naming: I like `markAsPartitioned`, because it describes
> >> what this operation is actually doing!
> >>
> >> Hi Sophie,
> >>
> >> I see the concern about poor code cohesion. We declare key mapping in
> >> one place of code, then later in another place we say
> >> "markAsPartitioned()". When we change the code six months later, we
> >> might forget to remove markAsPartitioned(), especially if it's placed in
> >> another method or class. But I don't understand why do you propose to
> >> include this config into Grouped/Joined/StreamJoined, because from this
> >> point of view it's not a better solution?
> >>
> >> The best approach regarding the cohesion might be to to add an extra
> >> 'preservePartition' flag to every key-changing operation, that is
> >>
> >> 1) selectKey
> >> 2) map
> >> 3) flatMap
> >> 4) transform
> >> 5) flatTransform
> >>
> >> in order to tell if the provided mapping require repartition or not.
> >> Indeed, this is a mapping operation property, not grouping one! BTW: the
> >> idea of adding extra parameter to `selectKey` was once coined by John
> >> Roesler.
> >>
> >> Arguments in favour for this approach: 1) better code cohesion from the
> >> point of view of the user, 2) 'smarter' code (the decision is taken
> >> depending on metadata provided for all the upstream mappings), 3)
> >> overall safer for the user.
> >>
> >> Arguments against: invasive KStreams API change, 5 more method
> >> overloads. Further on, when we add a new key-changing operation to
> >> KStream, we must add an overloaded version with 'preservePartition'.
> >> When we add a new overloaded version for existing operation, we actually
> >> might need to add two or more overloaded versions. This will soon become
> >> a mess.
> >>
> >> I thought that since `markAsPartitioned` is intended for advanced users,
> >> they will use it with care. When you're in a position where every
> >> serialization/deserialization round matters for the latency, you're
> >> extremely careful with the topology and you will not thoughtlessly add
> >> new key-changing operations without controlling how it's going to change
> >> the overall topology.
> >>
> >> By the way, if we later find a better solution, it's way more easy to
> >> deprecate a single `markAsPartitioned` operation than 5 method
> overloads.
> >>
> >> What do you think?
> >>
> >>
> >>
> >>
> >> 04.08.2021 4:23, Sophie Blee-Goldman пишет:
> >>> Do we really need a whole DSL operator for this? I think the original
> >> name
> >>> for this
> >>> operator -- `cancelRepartition()` -- is itself a sign that this is not
> an
> >>> operation on the
> >>> stream itself but rather a command/request to whichever operator would
> >> have
> >>> otherwise triggered this repartition.
> >>>
> >>> What about instead adding a new field to the
> Grouped/Joined/StreamJoined
> >>> config
> >>> objects that signals them to skip the repartitioning?
> >>>
> >>> The one downside to this specific proposal is that you would then need
> to
> >>> specify
> >>> this for every stateful operation downstream of the key-changing
> >> operation.
> >>> So a
> >>> better alternative might be to introduce this `skipRepartition` field,
> or
> >>> whatever we
> >>> want to call it, to the config object of the operator that's actually
> >> doing
> >>> the key
> >>> changing operation which is apparently preserving the partitioning.
> >>>
> >>> Imo this would be more "safe" relative to the current proposal, as the
> >> user
> >>> has to
> >>> explicitly consider whether every key changing operation is indeed
> >>> preserving the
> >>> partitioning. Otherwise you could code up a topology with several key
> >>> changing
> >>> operations at the beginning which do require repartitioning. Then you
> get
> >>> to the end
> >>> of the topology and insert one final key changing operation that
> doesn't,
> >>> assume
> >>> you can just cancel the repartition, and suddenly you're wondering why
> >> your
> >>> results
> >>> are all screwed up
> >>>
> >>> On Tue, Aug 3, 2021 at 6:02 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >>>
> >>>> Thanks for the KIP Ivan!
> >>>>
> >>>> I think it's a good feature to give advanced users more control, and
> >>>> allow them to build more efficient application.
> >>>>
> >>>> Not sure if I like the proposed named though (the good old "naming
> >>>> things" discussion :))
> >>>>
> >>>> Did you consider alternatives? What about
> >>>>
> >>>>    - markAsPartitioned()
> >>>>    - markAsKeyed()
> >>>>    - skipRepartition()
> >>>>
> >>>> Not sure if there are other idea on a good name?
> >>>>
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 6/24/21 7:45 AM, Ivan Ponomarev wrote:
> >>>>> Hello,
> >>>>>
> >>>>> I'd like to start a discussion for KIP-759:
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling
> >>>>>
> >>>>>
> >>>>> This is an offshoot of the discussion of KIP-655 for a `distinct`
> >>>>> operator, which turned out to be a separate proposal.
> >>>>>
> >>>>> The proposal is quite trivial, however, we still might consider
> >>>>> alternatives (see 'Possible Alternatives' section).
> >>>>>
> >>>>> Regards,
> >>>>>
> >>>>> Ivan
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to