Hi Mathieu, As Matthias said, we are working on improving the current join semantics:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287 and will keep you updated. As for KTable.filter(), I think it can actually achieve want you want: not forwarding nulls to the downstream operators; have you tried it out but find it is not working? Guozhang On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak < mathieu.fenn...@replicon.com> wrote: > Hm... OK, I think that makes sense. > > It seems like I can't filter out those tombstone records; is that expected > as well? If I throw in a .filter operation before my foreach, its > Predicate is not invoked, and the foreach's ForeachAction is invoked with a > null value still. > > Mathieu > > > On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > > > Hi Mathieu, > > > > join semantics are tricky. We are still working on a better > > documentation for it... > > > > For the current state and your question: > > > > Each time a record is processed, it looks up the other KTable to see if > > there is a matching record. If non is found, the join result is empty > > and a tombstone record with <key:null> is sent downstream. This happens, > > to delete any (possible existing) previous join result for this key -- > > keep in mind, that the result is a KTable containing the current state > > of the join. > > > > This happens both ways, thus, if your first records of each stream do > > not match on the key, both result in a <key:null> message to delete > > possible existing join-tuples in the result KTable. > > > > Does this make sense to you? > > > > -Matthias > > > > On 07/20/2016 04:09 PM, Mathieu Fenniak wrote: > > > Hello Kafka users, > > > > > > I'm seeing some unexpected results when using Kafka Streams, and I was > > > hoping someone could explain them to me. I have two streams, which > I've > > > converted KStream->KTable, and then I am joining them together with a > > > "join" (not an outer join, not a full join). With the resulting KTable > > > from the join, I am performing a foreach. > > > > > > When I startup my Streams application, my foreach receives two records > > with > > > valid keys but null values *before* my ValueJoiner ever gets executed. > > Why > > > would that be? > > > > > > Code excerpt; please excuse the use of Kotlin here: > > > > > > val builder = KStreamBuilder() > > > > > > val approvalStatus = builder.table( > > > Serdes.String(), > > > JsonSerde(TimesheetApprovalStatusChangedMessage::class.java), > > > "TimesheetApprovalStatusChanged" > > > ) > > > > > > val timesheetLastApprovalAction = builder.table( > > > Serdes.String(), > > > JsonSerde(Map::class.java), > > > "TimesheetApprovalActionPerformed" > > > ) > > > > > > val timesheetStatus = approvalStatus.join(timesheetLastApprovalAction, > { > > > approvalStatus, lastApprovalAction -> > > > println("EXECUTING ValueJoiner") > > > computeTimesheetStatus(approvalStatus.approvalStatus!!, > > lastApprovalAction > > > as Map<String, Any?>) > > > }) > > > > > > timesheetStatus.foreach({ timesheetKey, timesheetStatus -> > > > println("EXECUTING ForeachAction: $timesheetKey, status: > > > $timesheetStatus") > > > if (timesheetStatus == null) { > > > println("SKIPPING NULL I DON'T UNDERSTAND") > > > } > > > }) > > > > > > > > > Resulting console output: > > > > > > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > > > status: null > > > SKIPPING NULL I DON'T UNDERSTAND > > > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > > > status: null > > > SKIPPING NULL I DON'T UNDERSTAND > > > EXECUTING ValueJoiner > > > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > > > status: urn:replicon:timesheet-status:submitting > > > > > > > > > Any explanation on why the foreach would be executing for data that > > hasn't > > > been generated by my join? > > > > > > Thanks, > > > > > > Mathieu > > > > > > > > -- -- Guozhang