[
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15352103#comment-15352103
]
Jan Filipiak commented on KAFKA-3705:
-------------------------------------
I will just shoot a quick reply now, time somehow became sparse recently.
Anyhow. The bottom line of our misunderstandings is always the same thing. My
bad that I didn't see the wiki page, if that Range-query interface is addressed
that's nice :D.
Point 3 is the one that causes the most confusion I guess. In the repartition
case we follow different pathes, where I am not sure that I was able to
communicate mine well enough. I <3 the idea of having everything a derived
store. ITE all this is beeing used to tail -F mysql-XXXX.bin | kafka | XXX |
redis, therefore Redis become a derived store of mysql wich can be used for
NoSql style reads. I infact am such a great fan of this concept that I tend to
treat everything a derived store. For me this means a repartitioned topic is a
derived store of the source topic. This stands in contrast to make a changelog
out of it and materialize the changelog in say RocksDb. This leads to the
"problem" that the changelog topic is not a derived store anymore. Wich gives
me a personally bad feeling, it just pushes me out of my comfort zone.
Confluent peeps seem to be in their comfort zone with change logging topics. In
my narrative shit hits the fan when the property of beeing a derived store is
lost. It leads to all the nasty things like beeing in the need of change
logging your say RocksDbs as the intermidate topic wont hold stuff forever.
In contrast to having a change-logging topic that I re-materialize and then
changecapture again, I prefer todo the change capturing first and only maintain
the state to wich downstream partitions a record is currently published. This
works clean and nicely but brings with it what I call "key widening". Say I
have KTable A and i want to repartition it to A' so that the topic containing
A' is a derived store & logcompacted. Then I cant use Key<A> todo this for 2
reasons. The Stream partition, can only access the key to determine the
partition to delete from (deletes come as null values), wich means the fields
going to determine the partitions need to be in the key no matter what. Snippet:
{code:java}
topology.addSink(name, repartitionTopicName, new
StreamPartitioner<K, VR>(){
private Serializer<KL> intermediateSerializer =
intermediateSerde.serializer();
@Override
public Integer partition(K key, VR value, int
numPartitions) {
KL newKey = intermideateKeyExtractor.apply(key);
//Copied from Default Partitioner, didn't want
to create a CLUSTER object here to reuse it.
return
(Utils.murmur2(intermediateSerializer.serialize(repartitionTopicName, newKey))
% numPartitions )& 0x7fffffff;
}
}, repartitionProcessorName);
{code}
As you can see the result Key K contains the KL ( the key of the not
repatitioned table).
the second reason why this key must be there is that one needs to be able to
build a derived stream A''. But since in A' a record can "move" from partition
X to Y there is a race condition between the "insert" in Y and the delete in X.
The repartitioner Processor repartitioning for A'' needs to treat them as
different keys. If it would be the same key the delete would wipe the new value
maybe. This puts downstream consumers of A'' also in the wired position that at
any point in time there can be as many A-keys with the same value as there are
A' partitons -1 or a specific A key might vanish completly and then reappear.
Wich is sometimes wanky to work around in the end application. But there is
enough strategies to solve at least the multiple Akeys case, not so much for
the complete fanish case. I hope this clarrifies stuff.
> Support non-key joining in KTable
> ---------------------------------
>
> Key: KAFKA-3705
> URL: https://issues.apache.org/jira/browse/KAFKA-3705
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Guozhang Wang
> Assignee: Liquan Pei
> Labels: api
> Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but
> with a "foreign key" {{a}}, and assuming they are read from two topics which
> are partitioned on {{a}} and {{b}} respectively, they need to do the
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB'
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already
> partitioned on {{a}}, users still need to do the pre-aggregation in order to
> make the two joining streams to be on the same key. This is a draw-back from
> programability and we should fix it.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)