Thanks John & Matthias. I have created a report with Confluent ( https://github.com/confluentinc/schema-registry/issues/1061).
I will continue on with current work and we can resume the discussion, as Matthias correctly indicates, in the PR. Matthias, thank you for the link to Kafka-7777. This is something that my team has also come across, and I may be interested in pursuing a KIP on that once this one is completed. Thank you both again for your insight. On Tue, Mar 19, 2019 at 2:19 PM John Roesler <j...@confluent.io> wrote: > Chiming in... > > 1) Agreed. There is a technical reason 1:1 joins have to be co-partitioned, > which does not apply to the many:1 join you've designed. > > 2) Looking at the Serializer interface, it unfortunately doesn't indicate > whether the topic (or the value) is nullable. There are several places in > Streams where we need to serialize a value for purposes other than sending > it to a topic (KTableSuppressProcessor comes to mind), and using `null` for > the topic is the convention we have. I think we should just use `null` for > this case as well. Since we're doing this already, maybe we should document > in the Serializer interface which parameters are nullable. > > It sounds like you're using the Confluent serde, and need it to support > this usage. I'd recommend you just send a PR to that project independently. > > On Mon, Mar 18, 2019 at 7:13 PM Matthias J. Sax <matth...@confluent.io> > wrote: > > > Just my 2 cents. Not sure if others see it differently: > > > > 1) it seems that we can lift the restriction on having the same number > > of input topic partitions, and thus we should exploit this IMHO; don't > > see why we should enforce an artificial restriction > > > > > > 2) for the value serde it's a little bit more tricky; in general, Apache > > Kafka should not be concerned with third party tools. It seems that > > https://issues.apache.org/jira/browse/KAFKA-7777 might provide a > > solution though -- but it's unclear if KIP-213 and 7777 would be shipped > > with the same release... > > > > > To me, this is a shortcoming of the Confluent Avro Serde > > >> that will likely need to be fixed on that side. > > > > I agree (good to know...) > > > > > > 3) I am not an expert on hashing, but 128-bit murmur3 sounds reasonable > > to me > > > > > > > > Btw: I think we can have this discussion on the PR -- no need to concern > > the mailing list (it's a lot of people that are subscribed). > > > > > > > > -Matthias > > > > On 3/17/19 5:20 PM, Adam Bellemare wrote: > > > Hey folks > > > > > > I have been implementing the KIP as outlined in > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable > > , > > > and I have run into a few points to consider that we did not include in > > the > > > original. > > > > > > *1) Do all input topics need to have the same partitions or not?* > > Currently > > > I have it designed such that it must, to be consistent with other > joins. > > > However, consider the following: > > > TableA - 5 partitions > > > TableB - 2 partitions > > > Pre-subscribe Repartition Topic = 2 partitions, 2 RHS processors > > > Post-Subcribe Repartition Topic = 5 partitions, 5 LHS processors > > > > > > Would this not be possible? Is there a value in flexibility to this? I > > have > > > not looked deeper into the restrictions of this approach, so if there > is > > > something I should know I would appreciate a heads up. > > > > > > *2) Is it appropriate to use the KTable valueSerde during the > computation > > > of the hash?* To compute the hash I need to obtain an array of bytes, > > which > > > is immediately possible by using the valueSerde. However, the > Confluent > > > Kafka Schema Registry serializer fails when it is being used in this > way: > > > In the hash generating code, I set topic to null because the data is > not > > > dependent on any topic value. I simply want the serialized bytes to > input > > > into the hash function. > > > * byte[] preHashValue = serializer.serialize(topic = null, data)* > > > > > > Any KTable that is Consumed.with(Confluent-Key-Serde, > > > Confluent-Value-Serde) will automatically try to register the schema to > > > topic+"-key" and topic+"-value". If I pass in null, it tries to > register > > to > > > "-key" and "-value" each time the serializer is called, regardless of > the > > > class. In other words, it registers the schemas to a null topic and > fails > > > any subsequent serializations that aren't of the exact same schema. > Note > > > that this would be the case across ALL applications using the confluent > > > schema registry. To me, this is a shortcoming of the Confluent Avro > Serde > > > that will likely need to be fixed on that side. However, it does bring > up > > > the question - is this an appropriate way to use a serializer? > > Alternately, > > > if I should NOT use the KTable value-serde to generate the byte array, > > does > > > anyone have a better idea? > > > > > > *3) How big of a hash value do we need? Does the Foreign Key even > matter > > > for resolving?* > > > I am currently looking at fast, non-cryptologically-secure hash > options. > > We > > > use murmur2 already in Kafka, but it is only 32 bits. I have been > looking > > > at murmur3hash as implemented in the Apache Hive project ( > > > > > > https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/Murmur3.java > > ) > > > - it supports 128-bit hashes and is allegedly more performant than MD5. > > > With a 128-bit hash. The birthday problem indicates that we would have > a > > > 50% chance of a collision with 2^64 = 1.8446744e+19 entries. I believe > > that > > > this is sufficiently small, especially for our narrow time window, to > > > expect a collision for a singly-keyed event. I think that there is no > > > benefit in including the foreign key, but again, please let me know if > > this > > > is wrong. > > > > > > > > > Thanks All > > > > > > > >