Ivan,

are you still interested in this KIP? I think it would be a good addition.


-Matthias

On 8/16/21 5:30 PM, Matthias J. Sax wrote:
Your point about the IQ problem is an interesting one. I missed the
point that the "new key" would be a "superkey", and thus, it should
always be possible to compute the original key from the superkey. (As a
matter of fact, for windowed-table the windowed-key is also a superkey...)

I am not sure if we need to follow the "use the head idea" or if we need
a "CompositeKey" interface? It seems we can just allow for any types and
we can be agnostic to it?

KStream<K, V> stream = ...
KStream<SK, V> stream2 =
   stream.selectKey(/*set superkey*/)
         .markAsPartitioned()

We only need a `Function<SK, K>` without any restrictions on the type,
to map the "superkey" to the original "partition key"?


Do you propose to provide the "revers mapper" via the
`markAsPartitioned()` method (or config object), or via the IQ methods?
Not sure which one is better?


However, I am not sure if it would solve the join problem? At least not
easily: if one has two KStream<Tuple,...> and one is properly
partitioned by `Tuple` while the other one is "marked-as-partitoned",
the join would just fail. -- Similar for a stream-table join. -- The
only fix would be to do the re-partitioning anyway, effectively ignoring
the "user hint", but it seems to defeat the purpose? Again, I would
argue that it is ok to not handle this case, but leave it as the
responsibility for the user to not mess it up.


-Matthias

On 8/9/21 2:32 PM, Ivan Ponomarev wrote:
Hi Matthias and Sophie!

==1.  markAsPartitioned vs extending `selectKey(..)` etc. with a config.==

I don't have a strong opinion here, both Sophie's and Matthias' points
look convincing for me.

I think we should estimate the following: what is the probability that
we will ever need to extend `selectKey` etc. with a config for the
purposes other than `markAsPartitioned`?

If we find this probability high, then it's just a refactoring to
deprecate overloads with `Named` and introduce overloads with dedicated
configs, and we should do it this way.

If it's low or zero, maybe it's better not to mess with the existing
APIs and to introduce a single `markAsPartitioned()` method, which
itself can be easily deprecated if we find a better solution later!


==2. The IQ problem==

it then has to be the case that

Partitioner.partition(key) == Partitioner.partition(map(key))


Sophie, you got this wrong, and Matthias already explained why.

The actual required property for the mapping function is:

\forall k1, k2 (map(k1) = map(k2) => partition(k1) = partition(k2))

or, by contraposition law,

\forall k1, k2 (partition(k1) =/= partition(k2) => map(k1) =/= map(k2) )


(look at the whiteboard photo that I attached to the KIP).

There is a big class of such mappings: key -> Tuple(key, anyValue). This
is actually what we often do before aggregation, and this mapping does
not require repartition.

But of course we can extract the original key from Tuple(key, anyValue),
and this can save IQ and joins!

This is what I'm talking about when I talk about 'CompositeKey' idea.

We can do the following:

1. implement a 'partitioner wrapper' that recognizes tuples
(CompositeKeys) and uses only the 'head' to calculate the partition,

2. implement

selectCompositeKey(BiFunction<K, V> tailSelector) {
   selectKey((k, v) -> new CompositeKey(k, tailSelector.apply(k, v));
   //MARK_AS_PARTITIONED call here,
   //but this call is an implementation detail and we do not expose
   //markAsPartitioned publicly!
}

WDYT? (it's just a brainstorming idea)

09.08.2021 2:38, Matthias J. Sax пишет:
Hi,

I originally had a similar thought about `markAsPartitioned()` vs
extending `selectKey()` et al. with a config. While I agree that it
might be conceptually cleaner to use a config object, I did not propose
it as the API impact (deprecating stuff and adding new stuff) is quite
big... If we think it's an acceptable price to pay, I am ok with it
though.

I also do think, that `markAsPartitioned()` could actually be
categorized as an operator... We don't expose it in the API as
first-class citizen atm, but in fact we have two types of `KStream` -- a
"PartitionedKStream" and a "NonPartitionedKStream". Thus,
`markAsPartitioned()` can be seen as a "cast operator" that converts the
one into the other.

I also think that the raised concern about "forgetting to remove
`markAsPartitioned()`" might not be very strong though. If you have
different places in the code that link stuff together, a call to eg.
`selectKey().markAsPartitioned()` must always to together. If you have
some other place in the code that get a `KStream` passed an input, it
would be "invalid" to blindly call `markAsPartitioned()` as you don't
know anything about the upstream code. Of course, it requires some
"coding discipline" to follow this pattern... Also, you can shoot
themselves into the foot if they want with the config object pattern,
too: if you get a `KStream` passed in, you can skip repartitioning via
`selectKey((k,v) -> k, Config.markAsPartitioned())`. -- Thus, I still
slightly prefer to add `markAsPartitioned()` as an operator.

(Maybe we should have expose a `PartitionedKStream` as first class
object to begin with... Hard to introduce now I guess...)


The concern about IQ is interesting -- I did not realize this impact.
Thanks for bringing it up.

a repartition would be a no-op, ie that the stream (and its
partitioning)
would be the same
whether or not a repartition is inserted. For this to be true, it
then has
to be the case that

Partitioner.partition(key) == Partitioner.partition(map(key))

@Sophie: I don't think this statement is correct. A `markAsPartition()`
only means, that the existing partitioning ensure that all messages of
the same new key are still in the same partition. Ie, it cannot happen
that two new keys (that are the same) are in a different partition.

However, if you would physically repartitiong on the new key using the
same hash-function as for the old key, there is no guarantee that the
same partitions would be picked... And that is why IQ breaks downstream.

Btw: using `markAsPartitioned()` could also be an issue for joins for
the same reason... I want to call out, that the Jira tickets that did
raise the concern about unnecessary repartitioning are about downstream
aggregations though...

Last but not least: we actually have a similar situation for
windowed-aggregations: The result of a window aggregation is partitioned
by the "plain key": if we write the result into a topic using the same
partitioning function, we would write to different partitions... (I
guess it was never an issue so far, as we don't have KIP-300 in place
yet...)

It's also not an issue for IQ, because we know the plain key, and thus
can route to the right task.


About a solution: I think it might be ok to say we don't need to solve
this problem, but it's the users responsibility to take IQ into account.
Ie, if they want to use IQ downstream, the need to repartition: for this
case, repartitioning is _NOT_ unnecessary... The same argument seems to
apply for the join case I mentioned above. -- Given that
`markAsPartitioned()` is an advanced feature, it seems ok to leave it to
the user to use correctly (we should of course call it out in the docs!).



-Matthias



On 8/7/21 7:45 PM, Sophie Blee-Goldman wrote:
Before I dive in to the question of IQ and the approaches you
proposed, can
you just
elaborate on the problem itself? By definition, the `markAsPartitioned`
flag means that
a repartition would be a no-op, ie that the stream (and its
partitioning)
would be the same
whether or not a repartition is inserted. For this to be true, it
then has
to be the case that

Partitioner.partition(key) == Partitioner.partition(map(key))

The left-hand side of the above is precisely how we determine the
partition
number that
a key belongs to when using IQ. It shouldn't matter whether the user is
querying a key
in a store upstream of the key-changing operation or a mapped key
downstream of it
-- either way we just apply the given Partitioner.

See StreamsMetadataState#getKeyQueryMetadataForKey
<https://github.com/apache/kafka/blob/6854eb8332d6ef1f1c6216d2f67d6e146b1ef60f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java#L283>

for where this happens


If we're concerned that users might try to abuse the new
`markAsPartitioned` feature,
or accidentally misuse it, then we could add a runtime check that
applies
the Partitioner
associated with that subtopology to the key being processed and the
mapped
key result
to assert that they do indeed match. Imo this is probably overkill, just
putting it out there.

On Sat, Aug 7, 2021 at 1:42 PM Ivan Ponomarev
<iponoma...@mail.ru.invalid>
wrote:

Hi Sophie,

thanks for your reply! So your proposal is:

1). For each key-changing operation, deprecate the existing overloads
that accept a Named, and replace them with overloads that take an
operator-specific config object.
2). Add `markAsPartitioned` flag to these configs.

IMO, this looks much better than the original proposal, I like it very
much and I think I will rewrite the KIP soon. I absolutely agree with
your points. Repartition logic is not a part of the public contract,
and
it's much better to give it correct hints instead of telling explicitly
what it should do.

...

Since we're generating such bright ideas, maybe we should also
brainstorm the interactive query problem?

The problem is that interactive queries will not work properly when
`markAsPartitioned` is used. Although original key and mapped key will
be in the same partition, we will no longer be able to guess this
partition given the mapped key only.

The possible approaches are:

1) Give up and don't use interactive queries together with
`markAsPartitioned`. This is what I suppose now. But can we do better?

2) Maybe we should ask the user to provide 'reverse mapping' that will
allow IQ to restore the original key in order to choose the correct
partition. We can place this mapping in our new configuration
object. Of
course, there is no way for KStreams to verify in compile time/startup
time that the this function is actually the reverse mapping that
extract
the old key from the new one. Users will forget to provide this
function. Users will provide wrong functions. This all looks too
fragile.

3) Maybe there can be a completely different approach. Let's
introduce a
new entity -- composite keys, consisting of "head" and "tail". The
partition for the composite key is calculated based on its 'head' value
only. If we provide a key mapping in form key -> CompositeKey(key,
tail), then it's obvious that we do not need a repartition. When an
interactive query needs to guess the partition for CompositeKey, it
just
extracts its head and calculates the correct partition.

We can select CompositeKey before groupByKey() and aggregation
operations, and this will not involve repartition. And IQ will work.

Is it too daring idea, WDYT? My concern: will it cover all the cases
when we want to choose a different key, but also avoid repartition?

Regards,

Ivan



06.08.2021 23:19, Sophie Blee-Goldman пишет:
Hey Ivan

I completely agree that adding it as a config to Grouped/Joined/etc
isn't
much better, I was just
listing it for completeness, and that I would prefer to make it a
configuration of the key-changing
operation itself -- that's what I meant by

a better alternative might be to introduce this ... to the config
object
of
the operator that's actually

doing the key changing operation


I personally believe this is the semantically "correct" way to
approach
this, since "preserves partitioning"
or "does not preserve partitioning" is a property of a key-changing
operation and not an operation on the
stream itself. Also, this way the user need only tell Streams which
operations do or do not preserve the
partitioning, and Streams can figure out where to insert a
repartition in
the topology as it does today.

Otherwise, we're rendering this particularly useful feature of the
DSL --
automatic repartitioning -- pretty
much useless, since the user now has to figure out whether a
repartition
is
needed. On top of that, they
need to have some understanding of where and when this internal
automatic
repartitioning logic is going
to insert that repartition in order to cancel it in the appropriate
place.
Which is pretty unfortunate, since
that logic is not part of the public contract: it can change at any
time,
for example as it did when we introduced
the repartition merging optimization.

All that said, those are valid concerns regarding the expansion of the
API's surface area. Since none of
the key-changing operations currently have a config object like some
other
operations (for example Grouped
or Consumed, etc), this would double the number of overloads. But
maybe
this is a good opportunity to fix
that problem, rather than keep digging ourselves into holes by
trying to
work around it.

It looks like all of those key-changing operations have two
overloads at
the moment, one with no parameters
beyond the operation itself (eg KeyValueMapper for #selectKey) and the
other with an additional Named
parameter, which is itself another kind of configuration. What if we
instead deprecate the existing overloads
that accept a Named, and replace them with overloads that take an
operator-specific config object like we do
elsewhere (eg Grouped for #groupByKey). Then we can have both Named
and
this  `markAsPartitioned` flag
be part of the general config object, which (a) does not expand the
API
surface area at all in this KIP, and (b)
also protects future KIPs from needing to have this same conversation
over
and over, because we can now
stick any additional operator properties into that same config object.

WDYT? By the way, the above idea (introducing a single config
object to
wrap all operator properties) was also
raised by John Roesler a while back. Let's hope he hasn't changed his
mind
since then :)


On Fri, Aug 6, 2021 at 3:01 AM Ivan Ponomarev
<iponoma...@mail.ru.invalid

wrote:

Hi Matthias,

Concerning the naming: I like `markAsPartitioned`, because it
describes
what this operation is actually doing!

Hi Sophie,

I see the concern about poor code cohesion. We declare key mapping in
one place of code, then later in another place we say
"markAsPartitioned()". When we change the code six months later, we
might forget to remove markAsPartitioned(), especially if it's
placed in
another method or class. But I don't understand why do you propose to
include this config into Grouped/Joined/StreamJoined, because from
this
point of view it's not a better solution?

The best approach regarding the cohesion might be to to add an extra
'preservePartition' flag to every key-changing operation, that is

1) selectKey
2) map
3) flatMap
4) transform
5) flatTransform

in order to tell if the provided mapping require repartition or not.
Indeed, this is a mapping operation property, not grouping one!
BTW: the
idea of adding extra parameter to `selectKey` was once coined by John
Roesler.

Arguments in favour for this approach: 1) better code cohesion
from the
point of view of the user, 2) 'smarter' code (the decision is taken
depending on metadata provided for all the upstream mappings), 3)
overall safer for the user.

Arguments against: invasive KStreams API change, 5 more method
overloads. Further on, when we add a new key-changing operation to
KStream, we must add an overloaded version with 'preservePartition'.
When we add a new overloaded version for existing operation, we
actually
might need to add two or more overloaded versions. This will soon
become
a mess.

I thought that since `markAsPartitioned` is intended for advanced
users,
they will use it with care. When you're in a position where every
serialization/deserialization round matters for the latency, you're
extremely careful with the topology and you will not thoughtlessly
add
new key-changing operations without controlling how it's going to
change
the overall topology.

By the way, if we later find a better solution, it's way more easy to
deprecate a single `markAsPartitioned` operation than 5 method
overloads.

What do you think?




04.08.2021 4:23, Sophie Blee-Goldman пишет:
Do we really need a whole DSL operator for this? I think the
original
name
for this
operator -- `cancelRepartition()` -- is itself a sign that this
is not
an
operation on the
stream itself but rather a command/request to whichever operator
would
have
otherwise triggered this repartition.

What about instead adding a new field to the
Grouped/Joined/StreamJoined
config
objects that signals them to skip the repartitioning?

The one downside to this specific proposal is that you would then
need
to
specify
this for every stateful operation downstream of the key-changing
operation.
So a
better alternative might be to introduce this `skipRepartition`
field,
or
whatever we
want to call it, to the config object of the operator that's
actually
doing
the key
changing operation which is apparently preserving the partitioning.

Imo this would be more "safe" relative to the current proposal,
as the
user
has to
explicitly consider whether every key changing operation is indeed
preserving the
partitioning. Otherwise you could code up a topology with several
key
changing
operations at the beginning which do require repartitioning. Then
you
get
to the end
of the topology and insert one final key changing operation that
doesn't,
assume
you can just cancel the repartition, and suddenly you're
wondering why
your
results
are all screwed up

On Tue, Aug 3, 2021 at 6:02 PM Matthias J. Sax <mj...@apache.org>
wrote:

Thanks for the KIP Ivan!

I think it's a good feature to give advanced users more control,
and
allow them to build more efficient application.

Not sure if I like the proposed named though (the good old "naming
things" discussion :))

Did you consider alternatives? What about

     - markAsPartitioned()
     - markAsKeyed()
     - skipRepartition()

Not sure if there are other idea on a good name?



-Matthias

On 6/24/21 7:45 AM, Ivan Ponomarev wrote:
Hello,

I'd like to start a discussion for KIP-759:



https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling



This is an offshoot of the discussion of KIP-655 for a `distinct`
operator, which turned out to be a separate proposal.

The proposal is quite trivial, however, we still might consider
alternatives (see 'Possible Alternatives' section).

Regards,

Ivan









Reply via email to