Sachin,

Just a side note in addition to what Matthias mentioned: in the coming
0.10.1.0 release Kafka Streams has added the feature to do
auto-repartitioning by detecting if the message key are joinable or not. To
give a few examples:

-------

stream1 = builder.stream("topic1");
stream2 = builder.stream("topic2");

stream1.join(stream2);                          // Streams will check if
topic1 and topic2 are co-partitioned (i.e. having same number of
partitions, if not error out)


-------

stream1 = builder.stream("topic1");
stream2 = builder.stream("topic2").selectKey(...);

stream1.join(stream2);                          // Streams will assume that
stream2 is not co-partitioned with stream1 since its key has changed, and
hence auto re-partition stream2, in other words, it is re-written behind
the scene as:


stream1 = builder.stream("topic1");
stream2 =
builder.stream("topic2").selectKey(...).through("repartition-topic");  //
"repartition-topic" is created on-the-fly with the same number of
partitions as topic1.

stream1.join(stream2);



Stay tuned on the release docs for such new features in the upgrade / API
changes section.

Guozhang

On Sun, Oct 9, 2016 at 12:20 AM, Sachin Mittal <sjmit...@gmail.com> wrote:

> Thanks for pointing this out.
> I am doing exactly like this now and it is working fine.
>
> Sachin
>
>
> On Sun, Oct 9, 2016 at 12:32 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > You must ensure, that both streams are co-partitioned (ie, same number
> > of partitions and using the join key).
> >
> > (see "Note" box:
> > http://docs.confluent.io/3.0.1/streams/developer-guide.
> > html#joining-streams)
> >
> > You can enforce co-partitioning by introducing a call to .through()
> > before doing the join (on either one or both input streams). You need to
> > insert .through() for an input stream, if you did (potentially) modify
> > the key (eg, you did apply .selectKey(), map(), or flatMap() before the
> > join).
> >
> > If one stream's key is not modified, it is sufficient to only
> > re-distribute the other stream via .through(). Also keep in mind, that
> > you should create the topic use in .through() manually with the right
> > number of partitions before you start you application.
> >
> >
> > -Matthias
> >
> > On 10/08/2016 08:54 AM, Sachin Mittal wrote:
> > > I don't think that is the issue.
> > > The join api says:
> > > public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V,
> V1,
> > R>
> > > joiner)
> > > In my case V is Map<String, List<X>>
> > > V1 is List<X>
> > > R is Map<String, List<X>>
> > > K is String
> > >
> > > Note the final V and V1 are arrived after doing transformation on
> > original
> > > streams
> > > <String, Y>
> > >
> > > So there are intermediate steps like
> > > stream.map(new KeyValueMapper<String, Y, KeyValue<String, X>>())
> > > and
> > > table.mapValues(new ValueMapper<X, Map<String,X>>()
> > >
> > > So whenever I modify the structure of a stream or table do I need to
> back
> > > it up with a new kafka topic calling through("new-mapped-topic") ?
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > >
> > >
> > > On Sat, Oct 8, 2016 at 7:29 PM, Martin Gainty <mgai...@hotmail.com>
> > wrote:
> > >
> > >>
> > >>
> > >>
> > >>> From: sjmit...@gmail.com
> > >>> Date: Sat, 8 Oct 2016 15:30:33 +0530
> > >>> Subject: Understanding org.apache.kafka.streams.errors.
> > >> TopologyBuilderException
> > >>> To: users@kafka.apache.org
> > >>>
> > >>> Hi,
> > >>> I am getting this exception
> > >>>
> > >>> Exception in thread "main"
> > >>> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid
> > >> topology
> > >>> building: KTABLE-MAPVALUES-0000000007 and
> KSTREAM-AGGREGATE-0000000009
> > >> are
> > >>> not joinable
> > >>>
> > >> MG>look at join declaration for org.apache.kafka.streams.
> > >> kstream.internals.KTableImpl.java
> > >> MG> public <V1, R> KTable<K, R> join(KTable<K, V1> other,
> ValueJoiner<V,
> > >> V1, R> joiner)
> > >> MG>method join assumes 2 collections that exactly match the generic
> > >> declaration of join method
> > >>
> > >> MG>KTable<String, Map<String, List<V1>>> !=  KTable<String, List<V1>>
> > (2nd
> > >> parameter is missing both V and R declarators)
> > >> MG>you can establish a new collection of KTable<String, List<V1>>
> > >>
> > >> MG>and then *join* KTable<String, Map<String, List<V1>>>  into
> > >> KTable<String, List<V1>>  thru custom join method
> > >>
> > >>> What I am trying to do is I aggregate a KStream into a KTable of type
> > >>> KTable<String, Map<String, List<V>>>
> > >>>
> > >>> and I am trying to join it to another KStream which is aggregated
> into
> > >>> another KTable of type
> > >>>  KTable<String, List<V>>
> > >>>
> > >>> Since keys of both the final KTable are same, I don't understand why
> it
> > >> is
> > >>> giving this exception.
> > >>>
> > >>> Thanks
> > >>> Sachin
> > >>
> > >>
> > >
> >
> >
>



-- 
-- Guozhang

Reply via email to