Hi Mathieu,

As Matthias said, we are working on improving the current join semantics:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287

and will keep you updated.


As for KTable.filter(), I think it can actually achieve want you want: not
forwarding nulls to the downstream operators; have you tried it out but
find it is not working?


Guozhang



On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak <
mathieu.fenn...@replicon.com> wrote:

> Hm... OK, I think that makes sense.
>
> It seems like I can't filter out those tombstone records; is that expected
> as well?  If I throw in a .filter operation before my foreach, its
> Predicate is not invoked, and the foreach's ForeachAction is invoked with a
> null value still.
>
> Mathieu
>
>
> On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > Hi Mathieu,
> >
> > join semantics are tricky. We are still working on a better
> > documentation for it...
> >
> > For the current state and your question:
> >
> > Each time a record is processed, it looks up the other KTable to see if
> > there is a matching record. If non is found, the join result is empty
> > and a tombstone record with <key:null> is sent downstream. This happens,
> > to delete any (possible existing) previous join result for this key --
> > keep in mind, that the result is a KTable containing the current state
> > of the join.
> >
> > This happens both ways, thus, if your first records of each stream do
> > not match on the key, both result in a <key:null> message to delete
> > possible existing join-tuples in the result KTable.
> >
> > Does this make sense to you?
> >
> > -Matthias
> >
> > On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
> > > Hello Kafka users,
> > >
> > > I'm seeing some unexpected results when using Kafka Streams, and I was
> > > hoping someone could explain them to me.  I have two streams, which
> I've
> > > converted KStream->KTable, and then I am joining them together with a
> > > "join" (not an outer join, not a full join).  With the resulting KTable
> > > from the join, I am performing a foreach.
> > >
> > > When I startup my Streams application, my foreach receives two records
> > with
> > > valid keys but null values *before* my ValueJoiner ever gets executed.
> > Why
> > > would that be?
> > >
> > > Code excerpt; please excuse the use of Kotlin here:
> > >
> > > val builder = KStreamBuilder()
> > >
> > > val approvalStatus = builder.table(
> > >         Serdes.String(),
> > >         JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
> > >         "TimesheetApprovalStatusChanged"
> > > )
> > >
> > > val timesheetLastApprovalAction = builder.table(
> > >         Serdes.String(),
> > >         JsonSerde(Map::class.java),
> > >         "TimesheetApprovalActionPerformed"
> > > )
> > >
> > > val timesheetStatus = approvalStatus.join(timesheetLastApprovalAction,
> {
> > > approvalStatus, lastApprovalAction ->
> > >     println("EXECUTING ValueJoiner")
> > >     computeTimesheetStatus(approvalStatus.approvalStatus!!,
> > lastApprovalAction
> > > as Map<String, Any?>)
> > > })
> > >
> > > timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
> > >     println("EXECUTING ForeachAction: $timesheetKey, status:
> > > $timesheetStatus")
> > >     if (timesheetStatus == null) {
> > >         println("SKIPPING NULL I DON'T UNDERSTAND")
> > >     }
> > > })
> > >
> > >
> > > Resulting console output:
> > >
> > > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > status: null
> > > SKIPPING NULL I DON'T UNDERSTAND
> > > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > status: null
> > > SKIPPING NULL I DON'T UNDERSTAND
> > > EXECUTING ValueJoiner
> > > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > status: urn:replicon:timesheet-status:submitting
> > >
> > >
> > > Any explanation on why the foreach would be executing for data that
> > hasn't
> > > been generated by my join?
> > >
> > > Thanks,
> > >
> > > Mathieu
> > >
> >
> >
>



-- 
-- Guozhang

Reply via email to