Sachin, Just a side note in addition to what Matthias mentioned: in the coming 0.10.1.0 release Kafka Streams has added the feature to do auto-repartitioning by detecting if the message key are joinable or not. To give a few examples:
------- stream1 = builder.stream("topic1"); stream2 = builder.stream("topic2"); stream1.join(stream2); // Streams will check if topic1 and topic2 are co-partitioned (i.e. having same number of partitions, if not error out) ------- stream1 = builder.stream("topic1"); stream2 = builder.stream("topic2").selectKey(...); stream1.join(stream2); // Streams will assume that stream2 is not co-partitioned with stream1 since its key has changed, and hence auto re-partition stream2, in other words, it is re-written behind the scene as: stream1 = builder.stream("topic1"); stream2 = builder.stream("topic2").selectKey(...).through("repartition-topic"); // "repartition-topic" is created on-the-fly with the same number of partitions as topic1. stream1.join(stream2); Stay tuned on the release docs for such new features in the upgrade / API changes section. Guozhang On Sun, Oct 9, 2016 at 12:20 AM, Sachin Mittal <sjmit...@gmail.com> wrote: > Thanks for pointing this out. > I am doing exactly like this now and it is working fine. > > Sachin > > > On Sun, Oct 9, 2016 at 12:32 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > > > You must ensure, that both streams are co-partitioned (ie, same number > > of partitions and using the join key). > > > > (see "Note" box: > > http://docs.confluent.io/3.0.1/streams/developer-guide. > > html#joining-streams) > > > > You can enforce co-partitioning by introducing a call to .through() > > before doing the join (on either one or both input streams). You need to > > insert .through() for an input stream, if you did (potentially) modify > > the key (eg, you did apply .selectKey(), map(), or flatMap() before the > > join). > > > > If one stream's key is not modified, it is sufficient to only > > re-distribute the other stream via .through(). Also keep in mind, that > > you should create the topic use in .through() manually with the right > > number of partitions before you start you application. > > > > > > -Matthias > > > > On 10/08/2016 08:54 AM, Sachin Mittal wrote: > > > I don't think that is the issue. > > > The join api says: > > > public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, > V1, > > R> > > > joiner) > > > In my case V is Map<String, List<X>> > > > V1 is List<X> > > > R is Map<String, List<X>> > > > K is String > > > > > > Note the final V and V1 are arrived after doing transformation on > > original > > > streams > > > <String, Y> > > > > > > So there are intermediate steps like > > > stream.map(new KeyValueMapper<String, Y, KeyValue<String, X>>()) > > > and > > > table.mapValues(new ValueMapper<X, Map<String,X>>() > > > > > > So whenever I modify the structure of a stream or table do I need to > back > > > it up with a new kafka topic calling through("new-mapped-topic") ? > > > > > > Thanks > > > Sachin > > > > > > > > > > > > > > > On Sat, Oct 8, 2016 at 7:29 PM, Martin Gainty <mgai...@hotmail.com> > > wrote: > > > > > >> > > >> > > >> > > >>> From: sjmit...@gmail.com > > >>> Date: Sat, 8 Oct 2016 15:30:33 +0530 > > >>> Subject: Understanding org.apache.kafka.streams.errors. > > >> TopologyBuilderException > > >>> To: users@kafka.apache.org > > >>> > > >>> Hi, > > >>> I am getting this exception > > >>> > > >>> Exception in thread "main" > > >>> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid > > >> topology > > >>> building: KTABLE-MAPVALUES-0000000007 and > KSTREAM-AGGREGATE-0000000009 > > >> are > > >>> not joinable > > >>> > > >> MG>look at join declaration for org.apache.kafka.streams. > > >> kstream.internals.KTableImpl.java > > >> MG> public <V1, R> KTable<K, R> join(KTable<K, V1> other, > ValueJoiner<V, > > >> V1, R> joiner) > > >> MG>method join assumes 2 collections that exactly match the generic > > >> declaration of join method > > >> > > >> MG>KTable<String, Map<String, List<V1>>> != KTable<String, List<V1>> > > (2nd > > >> parameter is missing both V and R declarators) > > >> MG>you can establish a new collection of KTable<String, List<V1>> > > >> > > >> MG>and then *join* KTable<String, Map<String, List<V1>>> into > > >> KTable<String, List<V1>> thru custom join method > > >> > > >>> What I am trying to do is I aggregate a KStream into a KTable of type > > >>> KTable<String, Map<String, List<V>>> > > >>> > > >>> and I am trying to join it to another KStream which is aggregated > into > > >>> another KTable of type > > >>> KTable<String, List<V>> > > >>> > > >>> Since keys of both the final KTable are same, I don't understand why > it > > >> is > > >>> giving this exception. > > >>> > > >>> Thanks > > >>> Sachin > > >> > > >> > > > > > > > > -- -- Guozhang