You must ensure, that both streams are co-partitioned (ie, same number
of partitions and using the join key).

(see "Note" box:
http://docs.confluent.io/3.0.1/streams/developer-guide.html#joining-streams)

You can enforce co-partitioning by introducing a call to .through()
before doing the join (on either one or both input streams). You need to
insert .through() for an input stream, if you did (potentially) modify
the key (eg, you did apply .selectKey(), map(), or flatMap() before the
join).

If one stream's key is not modified, it is sufficient to only
re-distribute the other stream via .through(). Also keep in mind, that
you should create the topic use in .through() manually with the right
number of partitions before you start you application.


-Matthias

On 10/08/2016 08:54 AM, Sachin Mittal wrote:
> I don't think that is the issue.
> The join api says:
> public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R>
> joiner)
> In my case V is Map<String, List<X>>
> V1 is List<X>
> R is Map<String, List<X>>
> K is String
> 
> Note the final V and V1 are arrived after doing transformation on original
> streams
> <String, Y>
> 
> So there are intermediate steps like
> stream.map(new KeyValueMapper<String, Y, KeyValue<String, X>>())
> and
> table.mapValues(new ValueMapper<X, Map<String,X>>()
> 
> So whenever I modify the structure of a stream or table do I need to back
> it up with a new kafka topic calling through("new-mapped-topic") ?
> 
> Thanks
> Sachin
> 
> 
> 
> 
> On Sat, Oct 8, 2016 at 7:29 PM, Martin Gainty <mgai...@hotmail.com> wrote:
> 
>>
>>
>>
>>> From: sjmit...@gmail.com
>>> Date: Sat, 8 Oct 2016 15:30:33 +0530
>>> Subject: Understanding org.apache.kafka.streams.errors.
>> TopologyBuilderException
>>> To: users@kafka.apache.org
>>>
>>> Hi,
>>> I am getting this exception
>>>
>>> Exception in thread "main"
>>> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid
>> topology
>>> building: KTABLE-MAPVALUES-0000000007 and KSTREAM-AGGREGATE-0000000009
>> are
>>> not joinable
>>>
>> MG>look at join declaration for org.apache.kafka.streams.
>> kstream.internals.KTableImpl.java
>> MG> public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V,
>> V1, R> joiner)
>> MG>method join assumes 2 collections that exactly match the generic
>> declaration of join method
>>
>> MG>KTable<String, Map<String, List<V1>>> !=  KTable<String, List<V1>> (2nd
>> parameter is missing both V and R declarators)
>> MG>you can establish a new collection of KTable<String, List<V1>>
>>
>> MG>and then *join* KTable<String, Map<String, List<V1>>>  into
>> KTable<String, List<V1>>  thru custom join method
>>
>>> What I am trying to do is I aggregate a KStream into a KTable of type
>>> KTable<String, Map<String, List<V>>>
>>>
>>> and I am trying to join it to another KStream which is aggregated into
>>> another KTable of type
>>>  KTable<String, List<V>>
>>>
>>> Since keys of both the final KTable are same, I don't understand why it
>> is
>>> giving this exception.
>>>
>>> Thanks
>>> Sachin
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to