Chiming in...

1) Agreed. There is a technical reason 1:1 joins have to be co-partitioned,
which does not apply to the many:1 join you've designed.

2) Looking at the Serializer interface, it unfortunately doesn't indicate
whether the topic (or the value) is nullable. There are several places in
Streams where we need to serialize a value for purposes other than sending
it to a topic (KTableSuppressProcessor comes to mind), and using `null` for
the topic is the convention we have. I think we should just use `null` for
this case as well. Since we're doing this already, maybe we should document
in the Serializer interface which parameters are nullable.

It sounds like you're using the Confluent serde, and need it to support
this usage. I'd recommend you just send a PR to that project independently.

On Mon, Mar 18, 2019 at 7:13 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> Just my 2 cents. Not sure if others see it differently:
>
> 1) it seems that we can lift the restriction on having the same number
> of input topic partitions, and thus we should exploit this IMHO; don't
> see why we should enforce an artificial restriction
>
>
> 2) for the value serde it's a little bit more tricky; in general, Apache
> Kafka should not be concerned with third party tools. It seems that
> https://issues.apache.org/jira/browse/KAFKA-7777 might provide a
> solution though -- but it's unclear if KIP-213 and 7777 would be shipped
> with the same release...
>
> > To me, this is a shortcoming of the Confluent Avro Serde
> >> that will likely need to be fixed on that side.
>
> I agree (good to know...)
>
>
> 3) I am not an expert on hashing, but 128-bit murmur3 sounds reasonable
> to me
>
>
>
> Btw: I think we can have this discussion on the PR -- no need to concern
> the mailing list (it's a lot of people that are subscribed).
>
>
>
> -Matthias
>
> On 3/17/19 5:20 PM, Adam Bellemare wrote:
> > Hey folks
> >
> > I have been implementing the KIP as outlined in
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable
> ,
> > and I have run into a few points to consider that we did not include in
> the
> > original.
> >
> > *1) Do all input topics need to have the same partitions or not?*
> Currently
> > I have it designed such that it must, to be consistent with other joins.
> > However, consider the following:
> > TableA - 5 partitions
> > TableB - 2 partitions
> > Pre-subscribe Repartition Topic = 2 partitions, 2 RHS processors
> > Post-Subcribe Repartition Topic = 5 partitions, 5 LHS processors
> >
> > Would this not be possible? Is there a value in flexibility to this? I
> have
> > not looked deeper into the restrictions of this approach, so if there is
> > something I should know I would appreciate a heads up.
> >
> > *2) Is it appropriate to use the KTable valueSerde during the computation
> > of the hash?* To compute the hash I need to obtain an array of bytes,
> which
> > is immediately possible by  using the valueSerde. However, the Confluent
> > Kafka Schema Registry serializer fails when it is being used in this way:
> > In the hash generating code, I set topic to null because the data is not
> > dependent on any topic value. I simply want the serialized bytes to input
> > into the hash function.
> > *    byte[] preHashValue = serializer.serialize(topic = null, data)*
> >
> > Any KTable that is Consumed.with(Confluent-Key-Serde,
> > Confluent-Value-Serde) will automatically try to register the schema to
> > topic+"-key" and topic+"-value". If I pass in null, it tries to register
> to
> > "-key" and "-value" each time the serializer is called, regardless of the
> > class. In other words, it registers the schemas to a null topic and fails
> > any subsequent serializations that aren't of the exact same schema. Note
> > that this would be the case across ALL applications using the confluent
> > schema registry. To me, this is a shortcoming of the Confluent Avro Serde
> > that will likely need to be fixed on that side. However, it does bring up
> > the question - is this an appropriate way to use a serializer?
> Alternately,
> > if I should NOT use the KTable value-serde to generate the byte array,
> does
> > anyone have a better idea?
> >
> > *3) How big of a hash value do we need? Does the Foreign Key even matter
> > for resolving?*
> > I am currently looking at fast, non-cryptologically-secure hash options.
> We
> > use murmur2 already in Kafka, but it is only 32 bits. I have been looking
> > at murmur3hash as implemented in the Apache Hive project (
> >
> https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/Murmur3.java
> )
> > - it supports 128-bit hashes and is allegedly more performant than MD5.
> > With a 128-bit hash. The birthday problem indicates that we would have a
> > 50% chance of a collision with 2^64 = 1.8446744e+19 entries. I believe
> that
> > this is sufficiently small, especially for our narrow time window, to
> > expect a collision for a singly-keyed event. I think that there is no
> > benefit in including the foreign key, but again, please let me know if
> this
> > is wrong.
> >
> >
> > Thanks All
> >
>
>

Reply via email to