Hi Guozhang,

Yes, I tried to apply the filter on the KTable that came from join, and
then the foreach on the KTable that came from filter.  I was still getting
the nulls through to my foreach.

It is easy to workaround, but, the behaviour was especially surprising when
the filter didn't prevent it.

Mathieu


On Wed, Jul 20, 2016 at 11:57 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Mathieu,
>
> As Matthias said, we are working on improving the current join semantics:
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287
>
> and will keep you updated.
>
>
> As for KTable.filter(), I think it can actually achieve want you want: not
> forwarding nulls to the downstream operators; have you tried it out but
> find it is not working?
>
>
> Guozhang
>
>
>
> On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
>
> > Hm... OK, I think that makes sense.
> >
> > It seems like I can't filter out those tombstone records; is that
> expected
> > as well?  If I throw in a .filter operation before my foreach, its
> > Predicate is not invoked, and the foreach's ForeachAction is invoked
> with a
> > null value still.
> >
> > Mathieu
> >
> >
> > On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > Hi Mathieu,
> > >
> > > join semantics are tricky. We are still working on a better
> > > documentation for it...
> > >
> > > For the current state and your question:
> > >
> > > Each time a record is processed, it looks up the other KTable to see if
> > > there is a matching record. If non is found, the join result is empty
> > > and a tombstone record with <key:null> is sent downstream. This
> happens,
> > > to delete any (possible existing) previous join result for this key --
> > > keep in mind, that the result is a KTable containing the current state
> > > of the join.
> > >
> > > This happens both ways, thus, if your first records of each stream do
> > > not match on the key, both result in a <key:null> message to delete
> > > possible existing join-tuples in the result KTable.
> > >
> > > Does this make sense to you?
> > >
> > > -Matthias
> > >
> > > On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
> > > > Hello Kafka users,
> > > >
> > > > I'm seeing some unexpected results when using Kafka Streams, and I
> was
> > > > hoping someone could explain them to me.  I have two streams, which
> > I've
> > > > converted KStream->KTable, and then I am joining them together with a
> > > > "join" (not an outer join, not a full join).  With the resulting
> KTable
> > > > from the join, I am performing a foreach.
> > > >
> > > > When I startup my Streams application, my foreach receives two
> records
> > > with
> > > > valid keys but null values *before* my ValueJoiner ever gets
> executed.
> > > Why
> > > > would that be?
> > > >
> > > > Code excerpt; please excuse the use of Kotlin here:
> > > >
> > > > val builder = KStreamBuilder()
> > > >
> > > > val approvalStatus = builder.table(
> > > >         Serdes.String(),
> > > >         JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
> > > >         "TimesheetApprovalStatusChanged"
> > > > )
> > > >
> > > > val timesheetLastApprovalAction = builder.table(
> > > >         Serdes.String(),
> > > >         JsonSerde(Map::class.java),
> > > >         "TimesheetApprovalActionPerformed"
> > > > )
> > > >
> > > > val timesheetStatus =
> approvalStatus.join(timesheetLastApprovalAction,
> > {
> > > > approvalStatus, lastApprovalAction ->
> > > >     println("EXECUTING ValueJoiner")
> > > >     computeTimesheetStatus(approvalStatus.approvalStatus!!,
> > > lastApprovalAction
> > > > as Map<String, Any?>)
> > > > })
> > > >
> > > > timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
> > > >     println("EXECUTING ForeachAction: $timesheetKey, status:
> > > > $timesheetStatus")
> > > >     if (timesheetStatus == null) {
> > > >         println("SKIPPING NULL I DON'T UNDERSTAND")
> > > >     }
> > > > })
> > > >
> > > >
> > > > Resulting console output:
> > > >
> > > > EXECUTING ForeachAction:
> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > > status: null
> > > > SKIPPING NULL I DON'T UNDERSTAND
> > > > EXECUTING ForeachAction:
> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > > status: null
> > > > SKIPPING NULL I DON'T UNDERSTAND
> > > > EXECUTING ValueJoiner
> > > > EXECUTING ForeachAction:
> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > > status: urn:replicon:timesheet-status:submitting
> > > >
> > > >
> > > > Any explanation on why the foreach would be executing for data that
> > > hasn't
> > > > been generated by my join?
> > > >
> > > > Thanks,
> > > >
> > > > Mathieu
> > > >
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to