Are you using the 0.10.0.0 release or from trunk? On Wed, Jul 20, 2016 at 10:58 AM, Mathieu Fenniak < mathieu.fenn...@replicon.com> wrote:
> Hi Guozhang, > > Yes, I tried to apply the filter on the KTable that came from join, and > then the foreach on the KTable that came from filter. I was still getting > the nulls through to my foreach. > > It is easy to workaround, but, the behaviour was especially surprising when > the filter didn't prevent it. > > Mathieu > > > On Wed, Jul 20, 2016 at 11:57 AM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > Hi Mathieu, > > > > As Matthias said, we are working on improving the current join semantics: > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287 > > > > and will keep you updated. > > > > > > As for KTable.filter(), I think it can actually achieve want you want: > not > > forwarding nulls to the downstream operators; have you tried it out but > > find it is not working? > > > > > > Guozhang > > > > > > > > On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak < > > mathieu.fenn...@replicon.com> wrote: > > > > > Hm... OK, I think that makes sense. > > > > > > It seems like I can't filter out those tombstone records; is that > > expected > > > as well? If I throw in a .filter operation before my foreach, its > > > Predicate is not invoked, and the foreach's ForeachAction is invoked > > with a > > > null value still. > > > > > > Mathieu > > > > > > > > > On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax < > matth...@confluent.io> > > > wrote: > > > > > > > Hi Mathieu, > > > > > > > > join semantics are tricky. We are still working on a better > > > > documentation for it... > > > > > > > > For the current state and your question: > > > > > > > > Each time a record is processed, it looks up the other KTable to see > if > > > > there is a matching record. If non is found, the join result is empty > > > > and a tombstone record with <key:null> is sent downstream. This > > happens, > > > > to delete any (possible existing) previous join result for this key > -- > > > > keep in mind, that the result is a KTable containing the current > state > > > > of the join. > > > > > > > > This happens both ways, thus, if your first records of each stream do > > > > not match on the key, both result in a <key:null> message to delete > > > > possible existing join-tuples in the result KTable. > > > > > > > > Does this make sense to you? > > > > > > > > -Matthias > > > > > > > > On 07/20/2016 04:09 PM, Mathieu Fenniak wrote: > > > > > Hello Kafka users, > > > > > > > > > > I'm seeing some unexpected results when using Kafka Streams, and I > > was > > > > > hoping someone could explain them to me. I have two streams, which > > > I've > > > > > converted KStream->KTable, and then I am joining them together > with a > > > > > "join" (not an outer join, not a full join). With the resulting > > KTable > > > > > from the join, I am performing a foreach. > > > > > > > > > > When I startup my Streams application, my foreach receives two > > records > > > > with > > > > > valid keys but null values *before* my ValueJoiner ever gets > > executed. > > > > Why > > > > > would that be? > > > > > > > > > > Code excerpt; please excuse the use of Kotlin here: > > > > > > > > > > val builder = KStreamBuilder() > > > > > > > > > > val approvalStatus = builder.table( > > > > > Serdes.String(), > > > > > > JsonSerde(TimesheetApprovalStatusChangedMessage::class.java), > > > > > "TimesheetApprovalStatusChanged" > > > > > ) > > > > > > > > > > val timesheetLastApprovalAction = builder.table( > > > > > Serdes.String(), > > > > > JsonSerde(Map::class.java), > > > > > "TimesheetApprovalActionPerformed" > > > > > ) > > > > > > > > > > val timesheetStatus = > > approvalStatus.join(timesheetLastApprovalAction, > > > { > > > > > approvalStatus, lastApprovalAction -> > > > > > println("EXECUTING ValueJoiner") > > > > > computeTimesheetStatus(approvalStatus.approvalStatus!!, > > > > lastApprovalAction > > > > > as Map<String, Any?>) > > > > > }) > > > > > > > > > > timesheetStatus.foreach({ timesheetKey, timesheetStatus -> > > > > > println("EXECUTING ForeachAction: $timesheetKey, status: > > > > > $timesheetStatus") > > > > > if (timesheetStatus == null) { > > > > > println("SKIPPING NULL I DON'T UNDERSTAND") > > > > > } > > > > > }) > > > > > > > > > > > > > > > Resulting console output: > > > > > > > > > > EXECUTING ForeachAction: > > mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > > > > > status: null > > > > > SKIPPING NULL I DON'T UNDERSTAND > > > > > EXECUTING ForeachAction: > > mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > > > > > status: null > > > > > SKIPPING NULL I DON'T UNDERSTAND > > > > > EXECUTING ValueJoiner > > > > > EXECUTING ForeachAction: > > mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > > > > > status: urn:replicon:timesheet-status:submitting > > > > > > > > > > > > > > > Any explanation on why the foreach would be executing for data that > > > > hasn't > > > > > been generated by my join? > > > > > > > > > > Thanks, > > > > > > > > > > Mathieu > > > > > > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang