lucasbru commented on code in PR #17756: URL: https://github.com/apache/kafka/pull/17756#discussion_r1867478628
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignKeyExtractor.java: ########## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.foreignkeyjoin; + +import java.util.function.BiFunction; +import java.util.function.Function; + +@FunctionalInterface +public interface ForeignKeyExtractor<K, V, KO> { Review Comment: Maybe a little javadoc comment describing what this interface is used for could be helpful. ########## streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala: ########## @@ -643,6 +644,26 @@ class KTable[K, V](val inner: KTableJ[K, V]) { ): KTable[K, VR] = new KTable(inner.join(other.inner, keyExtractor.asJavaFunction, joiner, materialized)) + /** + * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed inner join. Records from this + * table are joined according to the result of keyExtractor on the other KTable. + * + * @param other the other [[KTable]] to be joined with this [[KTable]], keyed on the value obtained from keyExtractor + * @param keyExtractor a function that extracts the foreign key from this table's key and value + * @param joiner a function that computes the join result for a pair of matching records + * @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]] + * should be materialized. + * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner, + * one for each matched record-pair with the same key + */ + def join[VR, KO, VO]( + other: KTable[KO, VO], + keyExtractor: BiFunction[K, V, KO], Review Comment: I don't think in scala you'd use the `BiFunction` class, but probably something like `(K, V) => KO`, which is an instance of `Function2`. I think there are implicits to convert between the two, but this would definitely make the API look more "scalaesque". ########## streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java: ########## @@ -2268,6 +2417,32 @@ <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other, final TableJoined<K, KO> tableJoined, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); + /** + * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join, + * using the {@link TableJoined} instance for optional configurations including + * {@link StreamPartitioner partitioners} when the tables being joined use non-default partitioning, + * and also the base name for components of the join. + * <p> + * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}. + * + * @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO. + * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (K, V). If the Review Comment: This should not link to `Function` but to `BiFunction`. Also "from this table's value" should be "from a key-value pair in this table" I guess? Seems like you did this for the other overloads -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
