Yes, there are same number of partitions to both the topic, also same
partition key i.e userId
If I just join the streams without applying the map functions (in this
case userClickStream
and userEventSrtream) , it works.

Thanks,
Vivek


On Wed, Jul 13, 2016 at 4:53 PM, Philippe Derome <phder...@gmail.com> wrote:

> Did you specify same number of partitions for the two input topics you are
> joining? I think that this is usually the first thing people ask to verify
> with errors similar to yours.
>
> If you are experimenting with learning some concepts, it is simpler to
> always use one partition for your topics.
> On 13 Jul 2016 7:40 p.m., "vivek thakre" <vivek.tha...@gmail.com> wrote:
>
> > Hello,
> >
> > I want to join 2 Topics (KStreams)
> >
> >
> > Stream 1
> > Topic :  userIdClicks
> > Key : userId
> > Value : JSON String with event details
> >
> > Stream 2
> > Topic :  userIdChannel
> > Key : userId
> > Value : JSON String  with event details and has channel value
> >
> > I could not find any examples with KStream-to-KStream Join.
> >
> > Here is my code
> >
> > //build stream userIdClicks
> > > KStream<String, Long> userClickStream = builder.stream(stringSerde,
> > stringSerde,
> > > "userClicks");
> > >
> >
> >
> > > //create stream -> < userId, 1 (count) >
> > > KStream<String, Long> *userClickCountStream* = userClickStream.filter((
> > > userId,record)-> userId != null) .map((userId,record) -> new
> KeyValue<>(
> > > userId,1l));
> > >
> >
> >
> > > //build stream userChannelStream
> > > KStream<String, String> userEventStream = builder.stream(stringSerde,
> > > stringSerde, "userEvents");
> > >
> >
> >
> > > //create stream <userId, channel> : extract channel value from json
> > string
> > > KStream<String, String> *userChannelStream* =  userEventStream
> > >                 .filter((userId,record)-> userId != null)
> > >                 .map((userId,record) -> new KeyValue<>(userId
> > > ,JsonPath.read(record, "$.event.page.channel").toString()));
> > >
> >
> >
> > > //join *userClickCountStream* with
> > > *userChannelStream*KTable<String, Long> clicksPerChannel =
> > > userClickCountStream
> > >         .join(userChannelStream, new ValueJoiner<Long, String,
> > > ChannelWithClicks>() {
> > >              @Override
> > >              public ChannelWithClicks apply(Long clicks, String
> channel)
> > {
> > >                 return new ChannelWithClicks(channel == null ?
> "UNKNOWN"
> > > : channel, clicks);
> > >              }
> > >          },
> > JoinWindows.of("ClicksPerChannelwindowed").after(30000).before(30000))
> > > //30 secs before and after
> > >         .map((user, channelWithClicks) -> new
> > KeyValue<>(channelWithClicks
> > > .getChannel(), channelWithClicks.getClicks()))
> > >         .reduceByKey(
> > >                     (firstClicks, secondClicks) -> firstClicks +
> > > secondClicks,
> > >                      stringSerde, longSerde,
> "ClicksPerChannelUnwindowed"
> > > );
> >
> > When I run this topology, I get an exception
> >
> > Invalid topology building: KSTREAM-MAP-0000000003 and
> > KSTREAM-MAP-0000000006 are not joinable
> >
> > I looking for a way to join 2 KStreams.
> >
> > Thanks,
> >
> > Vivek
> >
>

Reply via email to