Guozhang,

"1) if the coming record's key is null, then when it flows into the join
processor inside the topology this record will be dropped as it cannot be
joined with any records from the other stream."

Can you please elaborate on the notion of key? By keys, do you mean kafka
partition keys?
For a json kstream to ktable example, can you please show me a sample input?

For me, the ktable has:

{"user_name": "Joe", "location": "US", "gender": "male"}
{"user_name": "Julie", "location": "US", "gender": "female"}

{"user_name": "Kawasaki", "location": "Japan", "gender": "male"}

The kstream gets a event (KStreams)

{"user": "Joe", "custom": {"choice":"vegan"}}

Is this data right or do I need to have a key and then a json - as in:


"joe", {"user_name": "Joe", "location": "US", "gender": "male"}




On Mon, Jun 26, 2017 at 4:42 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> I think your issue is in two folds:
>
> 1) if the coming record's key is null, then when it flows into the join
> processor inside the topology this record will be dropped as it cannot be
> joined with any records from the other stream.
>
> 2) the NPE you are getting when giving it the non-null keyed record seems
> because, you are using "SnowServerDeserialzer" (is it set as the default
> key deserializer) which expects a SnowServerPOJOClass while the key "joe"
> is typed String. You need to override the key deserialize when constructing
> the "cache" KTable as well:
>
> ----------------
> KTable <String, CachePOJOClass > cache = builder.table(Serdes.String(),
> rawSerde, "cache", "local-cache");
>
>
> Guozhang
>
>
> On Sun, Jun 25, 2017 at 11:30 PM, Shekar Tippur <ctip...@gmail.com> wrote:
>
> > Guozhang
> >
> > I am using 0.10.2.1 version
> >
> > - Shekar
> >
> > On Sun, Jun 25, 2017 at 10:36 AM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> >
> > > Hi Shekar,
> > >
> > > Could you demonstrate your input data. More specifically, what are the
> > key
> > > types of your input streams, and are they not-null values? It seems the
> > > root cause is similar to the other thread you asked on the mailing
> list.
> > >
> > > Also, could you provide your used Kafka Streams version?
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Sun, Jun 25, 2017 at 12:45 AM, Shekar Tippur <ctip...@gmail.com>
> > wrote:
> > >
> > > > Hello,
> > > >
> > > > I am having trouble implementing streams to table join.
> > > >
> > > > I have 2 POJO's each representing streams and table data structures.
> > raw
> > > > topic contains streams and cache topic contains table structure. The
> > join
> > > > is not happening since the print statement is not being called.
> > > >
> > > > Appreciate any pointers.
> > > >
> > > > - Shekar
> > > >
> > > > raw.leftJoin(cache, new ValueJoiner<RawPOJOClass,
> > > > CachePOJOClass,RawPOJOClass>() {
> > > >
> > > >     @Override
> > > >     public RawPOJOClass apply(RawPOJOClass r, CachePOJOClass c) {
> > > >
> > > >         String src=r.getSource();
> > > >         String cSrc=c.getSnowHost();
> > > >         Custom custom=new Custom();
> > > >
> > > >         if (src.matches(snowSrc)){
> > > >             System.out.println("In apply code");
> > > >             custom.setAdditionalProperty("custom",cSrc.getAll());
> > > >             r.setCustom(custom);
> > > >         }
> > > >         return r;
> > > >     }
> > > > }).to("parser");
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to