Hm... OK, I think that makes sense. It seems like I can't filter out those tombstone records; is that expected as well? If I throw in a .filter operation before my foreach, its Predicate is not invoked, and the foreach's ForeachAction is invoked with a null value still.
Mathieu On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax <matth...@confluent.io> wrote: > Hi Mathieu, > > join semantics are tricky. We are still working on a better > documentation for it... > > For the current state and your question: > > Each time a record is processed, it looks up the other KTable to see if > there is a matching record. If non is found, the join result is empty > and a tombstone record with <key:null> is sent downstream. This happens, > to delete any (possible existing) previous join result for this key -- > keep in mind, that the result is a KTable containing the current state > of the join. > > This happens both ways, thus, if your first records of each stream do > not match on the key, both result in a <key:null> message to delete > possible existing join-tuples in the result KTable. > > Does this make sense to you? > > -Matthias > > On 07/20/2016 04:09 PM, Mathieu Fenniak wrote: > > Hello Kafka users, > > > > I'm seeing some unexpected results when using Kafka Streams, and I was > > hoping someone could explain them to me. I have two streams, which I've > > converted KStream->KTable, and then I am joining them together with a > > "join" (not an outer join, not a full join). With the resulting KTable > > from the join, I am performing a foreach. > > > > When I startup my Streams application, my foreach receives two records > with > > valid keys but null values *before* my ValueJoiner ever gets executed. > Why > > would that be? > > > > Code excerpt; please excuse the use of Kotlin here: > > > > val builder = KStreamBuilder() > > > > val approvalStatus = builder.table( > > Serdes.String(), > > JsonSerde(TimesheetApprovalStatusChangedMessage::class.java), > > "TimesheetApprovalStatusChanged" > > ) > > > > val timesheetLastApprovalAction = builder.table( > > Serdes.String(), > > JsonSerde(Map::class.java), > > "TimesheetApprovalActionPerformed" > > ) > > > > val timesheetStatus = approvalStatus.join(timesheetLastApprovalAction, { > > approvalStatus, lastApprovalAction -> > > println("EXECUTING ValueJoiner") > > computeTimesheetStatus(approvalStatus.approvalStatus!!, > lastApprovalAction > > as Map<String, Any?>) > > }) > > > > timesheetStatus.foreach({ timesheetKey, timesheetStatus -> > > println("EXECUTING ForeachAction: $timesheetKey, status: > > $timesheetStatus") > > if (timesheetStatus == null) { > > println("SKIPPING NULL I DON'T UNDERSTAND") > > } > > }) > > > > > > Resulting console output: > > > > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > > status: null > > SKIPPING NULL I DON'T UNDERSTAND > > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > > status: null > > SKIPPING NULL I DON'T UNDERSTAND > > EXECUTING ValueJoiner > > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > > status: urn:replicon:timesheet-status:submitting > > > > > > Any explanation on why the foreach would be executing for data that > hasn't > > been generated by my join? > > > > Thanks, > > > > Mathieu > > > >