Hello Jeff,

You can actually change the message key before the join. For example, let's
day your stream A has the format of

    key: {a}, value: {b, c}

And stream B with the format of

    key: {d}, value: {b, e}

And you want to join by value field {e}. You can write in the DSL:

    streamA' = streamA.map((key, value) => new KeyValue(value.b, {key,
value}));         // now key: {b}, value: {a, b, c}
    streamB' = streamB.map((key, value) => new KeyValue(value.b, {key,
value}));         // now key: {b}, value: {d, b, e}

    joined = streamA'.join(streamB', joiner);    // key: {b}, value: {
joined(a, b, c, d, e) }


Kafka Streams will automatically do the re-partitioning behind the scene,
based on field {b} in an internal topic.

Does this work for your case?

Guozhang


On Fri, Mar 18, 2016 at 12:31 PM, Jeff Klukas <[email protected]> wrote:

> I'm experimenting with the Kafka Streams preview and understand that joins
> can only happen between KStreams and/or KTables that are co-partitioned.
> This is a reasonable limitation necessary to support large streams.
>
> What if I have a small topic, though, that I'd like to be able to join
> based on values in a stream's messages rather than the partition key? Could
> there be a concept of a fully replicated KTable where every thread in my
> Kafka Streams application would read a full copy into memory to be
> available for joins without the restriction on shared keys?
>
> I could probably achieve the effect I want by implementing a consumer in a
> separate thread to read the topic into RocksDB. I would then do lookups
> from that separate RocksDB instance in "map" operations within my Kafka
> Streams application.
>
> Is there an easier alternative that I'm missing? It would nice to have a
> standard mechanism for maintaining small topics like these and making them
> available for joins without key restrictions.
>



-- 
-- Guozhang

Reply via email to