Hey folks I have been implementing the KIP as outlined in https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable, and I have run into a few points to consider that we did not include in the original.
*1) Do all input topics need to have the same partitions or not?* Currently I have it designed such that it must, to be consistent with other joins. However, consider the following: TableA - 5 partitions TableB - 2 partitions Pre-subscribe Repartition Topic = 2 partitions, 2 RHS processors Post-Subcribe Repartition Topic = 5 partitions, 5 LHS processors Would this not be possible? Is there a value in flexibility to this? I have not looked deeper into the restrictions of this approach, so if there is something I should know I would appreciate a heads up. *2) Is it appropriate to use the KTable valueSerde during the computation of the hash?* To compute the hash I need to obtain an array of bytes, which is immediately possible by using the valueSerde. However, the Confluent Kafka Schema Registry serializer fails when it is being used in this way: In the hash generating code, I set topic to null because the data is not dependent on any topic value. I simply want the serialized bytes to input into the hash function. * byte[] preHashValue = serializer.serialize(topic = null, data)* Any KTable that is Consumed.with(Confluent-Key-Serde, Confluent-Value-Serde) will automatically try to register the schema to topic+"-key" and topic+"-value". If I pass in null, it tries to register to "-key" and "-value" each time the serializer is called, regardless of the class. In other words, it registers the schemas to a null topic and fails any subsequent serializations that aren't of the exact same schema. Note that this would be the case across ALL applications using the confluent schema registry. To me, this is a shortcoming of the Confluent Avro Serde that will likely need to be fixed on that side. However, it does bring up the question - is this an appropriate way to use a serializer? Alternately, if I should NOT use the KTable value-serde to generate the byte array, does anyone have a better idea? *3) How big of a hash value do we need? Does the Foreign Key even matter for resolving?* I am currently looking at fast, non-cryptologically-secure hash options. We use murmur2 already in Kafka, but it is only 32 bits. I have been looking at murmur3hash as implemented in the Apache Hive project ( https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/Murmur3.java) - it supports 128-bit hashes and is allegedly more performant than MD5. With a 128-bit hash. The birthday problem indicates that we would have a 50% chance of a collision with 2^64 = 1.8446744e+19 entries. I believe that this is sufficiently small, especially for our narrow time window, to expect a collision for a singly-keyed event. I think that there is no benefit in including the foreign key, but again, please let me know if this is wrong. Thanks All