Did you specify same number of partitions for the two input topics you are
joining? I think that this is usually the first thing people ask to verify
with errors similar to yours.

If you are experimenting with learning some concepts, it is simpler to
always use one partition for your topics.
On 13 Jul 2016 7:40 p.m., "vivek thakre" <vivek.tha...@gmail.com> wrote:

> Hello,
>
> I want to join 2 Topics (KStreams)
>
>
> Stream 1
> Topic :  userIdClicks
> Key : userId
> Value : JSON String with event details
>
> Stream 2
> Topic :  userIdChannel
> Key : userId
> Value : JSON String  with event details and has channel value
>
> I could not find any examples with KStream-to-KStream Join.
>
> Here is my code
>
> //build stream userIdClicks
> > KStream<String, Long> userClickStream = builder.stream(stringSerde,
> stringSerde,
> > "userClicks");
> >
>
>
> > //create stream -> < userId, 1 (count) >
> > KStream<String, Long> *userClickCountStream* = userClickStream.filter((
> > userId,record)-> userId != null) .map((userId,record) -> new KeyValue<>(
> > userId,1l));
> >
>
>
> > //build stream userChannelStream
> > KStream<String, String> userEventStream = builder.stream(stringSerde,
> > stringSerde, "userEvents");
> >
>
>
> > //create stream <userId, channel> : extract channel value from json
> string
> > KStream<String, String> *userChannelStream* =  userEventStream
> >                 .filter((userId,record)-> userId != null)
> >                 .map((userId,record) -> new KeyValue<>(userId
> > ,JsonPath.read(record, "$.event.page.channel").toString()));
> >
>
>
> > //join *userClickCountStream* with
> > *userChannelStream*KTable<String, Long> clicksPerChannel =
> > userClickCountStream
> >         .join(userChannelStream, new ValueJoiner<Long, String,
> > ChannelWithClicks>() {
> >              @Override
> >              public ChannelWithClicks apply(Long clicks, String channel)
> {
> >                 return new ChannelWithClicks(channel == null ? "UNKNOWN"
> > : channel, clicks);
> >              }
> >          },
> JoinWindows.of("ClicksPerChannelwindowed").after(30000).before(30000))
> > //30 secs before and after
> >         .map((user, channelWithClicks) -> new
> KeyValue<>(channelWithClicks
> > .getChannel(), channelWithClicks.getClicks()))
> >         .reduceByKey(
> >                     (firstClicks, secondClicks) -> firstClicks +
> > secondClicks,
> >                      stringSerde, longSerde, "ClicksPerChannelUnwindowed"
> > );
>
> When I run this topology, I get an exception
>
> Invalid topology building: KSTREAM-MAP-0000000003 and
> KSTREAM-MAP-0000000006 are not joinable
>
> I looking for a way to join 2 KStreams.
>
> Thanks,
>
> Vivek
>

Reply via email to