Hm... OK, I think that makes sense.

It seems like I can't filter out those tombstone records; is that expected
as well?  If I throw in a .filter operation before my foreach, its
Predicate is not invoked, and the foreach's ForeachAction is invoked with a
null value still.

Mathieu


On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Hi Mathieu,
>
> join semantics are tricky. We are still working on a better
> documentation for it...
>
> For the current state and your question:
>
> Each time a record is processed, it looks up the other KTable to see if
> there is a matching record. If non is found, the join result is empty
> and a tombstone record with <key:null> is sent downstream. This happens,
> to delete any (possible existing) previous join result for this key --
> keep in mind, that the result is a KTable containing the current state
> of the join.
>
> This happens both ways, thus, if your first records of each stream do
> not match on the key, both result in a <key:null> message to delete
> possible existing join-tuples in the result KTable.
>
> Does this make sense to you?
>
> -Matthias
>
> On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
> > Hello Kafka users,
> >
> > I'm seeing some unexpected results when using Kafka Streams, and I was
> > hoping someone could explain them to me.  I have two streams, which I've
> > converted KStream->KTable, and then I am joining them together with a
> > "join" (not an outer join, not a full join).  With the resulting KTable
> > from the join, I am performing a foreach.
> >
> > When I startup my Streams application, my foreach receives two records
> with
> > valid keys but null values *before* my ValueJoiner ever gets executed.
> Why
> > would that be?
> >
> > Code excerpt; please excuse the use of Kotlin here:
> >
> > val builder = KStreamBuilder()
> >
> > val approvalStatus = builder.table(
> >         Serdes.String(),
> >         JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
> >         "TimesheetApprovalStatusChanged"
> > )
> >
> > val timesheetLastApprovalAction = builder.table(
> >         Serdes.String(),
> >         JsonSerde(Map::class.java),
> >         "TimesheetApprovalActionPerformed"
> > )
> >
> > val timesheetStatus = approvalStatus.join(timesheetLastApprovalAction, {
> > approvalStatus, lastApprovalAction ->
> >     println("EXECUTING ValueJoiner")
> >     computeTimesheetStatus(approvalStatus.approvalStatus!!,
> lastApprovalAction
> > as Map<String, Any?>)
> > })
> >
> > timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
> >     println("EXECUTING ForeachAction: $timesheetKey, status:
> > $timesheetStatus")
> >     if (timesheetStatus == null) {
> >         println("SKIPPING NULL I DON'T UNDERSTAND")
> >     }
> > })
> >
> >
> > Resulting console output:
> >
> > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > status: null
> > SKIPPING NULL I DON'T UNDERSTAND
> > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > status: null
> > SKIPPING NULL I DON'T UNDERSTAND
> > EXECUTING ValueJoiner
> > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > status: urn:replicon:timesheet-status:submitting
> >
> >
> > Any explanation on why the foreach would be executing for data that
> hasn't
> > been generated by my join?
> >
> > Thanks,
> >
> > Mathieu
> >
>
>

Reply via email to