Hey folks

I have been implementing the KIP as outlined in
https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable,
and I have run into a few points to consider that we did not include in the
original.

*1) Do all input topics need to have the same partitions or not?* Currently
I have it designed such that it must, to be consistent with other joins.
However, consider the following:
TableA - 5 partitions
TableB - 2 partitions
Pre-subscribe Repartition Topic = 2 partitions, 2 RHS processors
Post-Subcribe Repartition Topic = 5 partitions, 5 LHS processors

Would this not be possible? Is there a value in flexibility to this? I have
not looked deeper into the restrictions of this approach, so if there is
something I should know I would appreciate a heads up.

*2) Is it appropriate to use the KTable valueSerde during the computation
of the hash?* To compute the hash I need to obtain an array of bytes, which
is immediately possible by  using the valueSerde. However, the Confluent
Kafka Schema Registry serializer fails when it is being used in this way:
In the hash generating code, I set topic to null because the data is not
dependent on any topic value. I simply want the serialized bytes to input
into the hash function.
*    byte[] preHashValue = serializer.serialize(topic = null, data)*

Any KTable that is Consumed.with(Confluent-Key-Serde,
Confluent-Value-Serde) will automatically try to register the schema to
topic+"-key" and topic+"-value". If I pass in null, it tries to register to
"-key" and "-value" each time the serializer is called, regardless of the
class. In other words, it registers the schemas to a null topic and fails
any subsequent serializations that aren't of the exact same schema. Note
that this would be the case across ALL applications using the confluent
schema registry. To me, this is a shortcoming of the Confluent Avro Serde
that will likely need to be fixed on that side. However, it does bring up
the question - is this an appropriate way to use a serializer? Alternately,
if I should NOT use the KTable value-serde to generate the byte array, does
anyone have a better idea?

*3) How big of a hash value do we need? Does the Foreign Key even matter
for resolving?*
I am currently looking at fast, non-cryptologically-secure hash options. We
use murmur2 already in Kafka, but it is only 32 bits. I have been looking
at murmur3hash as implemented in the Apache Hive project (
https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/Murmur3.java)
- it supports 128-bit hashes and is allegedly more performant than MD5.
With a 128-bit hash. The birthday problem indicates that we would have a
50% chance of a collision with 2^64 = 1.8446744e+19 entries. I believe that
this is sufficiently small, especially for our narrow time window, to
expect a collision for a singly-keyed event. I think that there is no
benefit in including the foreign key, but again, please let me know if this
is wrong.


Thanks All

Reply via email to