Just my 2 cents. Not sure if others see it differently: 1) it seems that we can lift the restriction on having the same number of input topic partitions, and thus we should exploit this IMHO; don't see why we should enforce an artificial restriction
2) for the value serde it's a little bit more tricky; in general, Apache Kafka should not be concerned with third party tools. It seems that https://issues.apache.org/jira/browse/KAFKA-7777 might provide a solution though -- but it's unclear if KIP-213 and 7777 would be shipped with the same release... > To me, this is a shortcoming of the Confluent Avro Serde >> that will likely need to be fixed on that side. I agree (good to know...) 3) I am not an expert on hashing, but 128-bit murmur3 sounds reasonable to me Btw: I think we can have this discussion on the PR -- no need to concern the mailing list (it's a lot of people that are subscribed). -Matthias On 3/17/19 5:20 PM, Adam Bellemare wrote: > Hey folks > > I have been implementing the KIP as outlined in > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable, > and I have run into a few points to consider that we did not include in the > original. > > *1) Do all input topics need to have the same partitions or not?* Currently > I have it designed such that it must, to be consistent with other joins. > However, consider the following: > TableA - 5 partitions > TableB - 2 partitions > Pre-subscribe Repartition Topic = 2 partitions, 2 RHS processors > Post-Subcribe Repartition Topic = 5 partitions, 5 LHS processors > > Would this not be possible? Is there a value in flexibility to this? I have > not looked deeper into the restrictions of this approach, so if there is > something I should know I would appreciate a heads up. > > *2) Is it appropriate to use the KTable valueSerde during the computation > of the hash?* To compute the hash I need to obtain an array of bytes, which > is immediately possible by using the valueSerde. However, the Confluent > Kafka Schema Registry serializer fails when it is being used in this way: > In the hash generating code, I set topic to null because the data is not > dependent on any topic value. I simply want the serialized bytes to input > into the hash function. > * byte[] preHashValue = serializer.serialize(topic = null, data)* > > Any KTable that is Consumed.with(Confluent-Key-Serde, > Confluent-Value-Serde) will automatically try to register the schema to > topic+"-key" and topic+"-value". If I pass in null, it tries to register to > "-key" and "-value" each time the serializer is called, regardless of the > class. In other words, it registers the schemas to a null topic and fails > any subsequent serializations that aren't of the exact same schema. Note > that this would be the case across ALL applications using the confluent > schema registry. To me, this is a shortcoming of the Confluent Avro Serde > that will likely need to be fixed on that side. However, it does bring up > the question - is this an appropriate way to use a serializer? Alternately, > if I should NOT use the KTable value-serde to generate the byte array, does > anyone have a better idea? > > *3) How big of a hash value do we need? Does the Foreign Key even matter > for resolving?* > I am currently looking at fast, non-cryptologically-secure hash options. We > use murmur2 already in Kafka, but it is only 32 bits. I have been looking > at murmur3hash as implemented in the Apache Hive project ( > https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/Murmur3.java) > - it supports 128-bit hashes and is allegedly more performant than MD5. > With a 128-bit hash. The birthday problem indicates that we would have a > 50% chance of a collision with 2^64 = 1.8446744e+19 entries. I believe that > this is sufficiently small, especially for our narrow time window, to > expect a collision for a singly-keyed event. I think that there is no > benefit in including the foreign key, but again, please let me know if this > is wrong. > > > Thanks All >
signature.asc
Description: OpenPGP digital signature
