Thanks for pointing this out. I am doing exactly like this now and it is working fine.
Sachin On Sun, Oct 9, 2016 at 12:32 PM, Matthias J. Sax <matth...@confluent.io> wrote: > You must ensure, that both streams are co-partitioned (ie, same number > of partitions and using the join key). > > (see "Note" box: > http://docs.confluent.io/3.0.1/streams/developer-guide. > html#joining-streams) > > You can enforce co-partitioning by introducing a call to .through() > before doing the join (on either one or both input streams). You need to > insert .through() for an input stream, if you did (potentially) modify > the key (eg, you did apply .selectKey(), map(), or flatMap() before the > join). > > If one stream's key is not modified, it is sufficient to only > re-distribute the other stream via .through(). Also keep in mind, that > you should create the topic use in .through() manually with the right > number of partitions before you start you application. > > > -Matthias > > On 10/08/2016 08:54 AM, Sachin Mittal wrote: > > I don't think that is the issue. > > The join api says: > > public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, > R> > > joiner) > > In my case V is Map<String, List<X>> > > V1 is List<X> > > R is Map<String, List<X>> > > K is String > > > > Note the final V and V1 are arrived after doing transformation on > original > > streams > > <String, Y> > > > > So there are intermediate steps like > > stream.map(new KeyValueMapper<String, Y, KeyValue<String, X>>()) > > and > > table.mapValues(new ValueMapper<X, Map<String,X>>() > > > > So whenever I modify the structure of a stream or table do I need to back > > it up with a new kafka topic calling through("new-mapped-topic") ? > > > > Thanks > > Sachin > > > > > > > > > > On Sat, Oct 8, 2016 at 7:29 PM, Martin Gainty <mgai...@hotmail.com> > wrote: > > > >> > >> > >> > >>> From: sjmit...@gmail.com > >>> Date: Sat, 8 Oct 2016 15:30:33 +0530 > >>> Subject: Understanding org.apache.kafka.streams.errors. > >> TopologyBuilderException > >>> To: users@kafka.apache.org > >>> > >>> Hi, > >>> I am getting this exception > >>> > >>> Exception in thread "main" > >>> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid > >> topology > >>> building: KTABLE-MAPVALUES-0000000007 and KSTREAM-AGGREGATE-0000000009 > >> are > >>> not joinable > >>> > >> MG>look at join declaration for org.apache.kafka.streams. > >> kstream.internals.KTableImpl.java > >> MG> public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, > >> V1, R> joiner) > >> MG>method join assumes 2 collections that exactly match the generic > >> declaration of join method > >> > >> MG>KTable<String, Map<String, List<V1>>> != KTable<String, List<V1>> > (2nd > >> parameter is missing both V and R declarators) > >> MG>you can establish a new collection of KTable<String, List<V1>> > >> > >> MG>and then *join* KTable<String, Map<String, List<V1>>> into > >> KTable<String, List<V1>> thru custom join method > >> > >>> What I am trying to do is I aggregate a KStream into a KTable of type > >>> KTable<String, Map<String, List<V>>> > >>> > >>> and I am trying to join it to another KStream which is aggregated into > >>> another KTable of type > >>> KTable<String, List<V>> > >>> > >>> Since keys of both the final KTable are same, I don't understand why it > >> is > >>> giving this exception. > >>> > >>> Thanks > >>> Sachin > >> > >> > > > >