I think this is a bug of 0.10.0 that we have already fixed in trunk some time ago.
Guozhang On Wed, Jul 20, 2016 at 12:25 PM, Mathieu Fenniak < mathieu.fenn...@replicon.com> wrote: > I'm using the 0.10.0.0 release. > > Matthias's suggestion of using .toStream().filter(...).foreach(...) does > prevents the nulls from reaching the foreach. But > .filter(...).foreach(...) does not; the filter's predicate is not even > executed before the ForeachAction receives the null records. > > > > On Wed, Jul 20, 2016 at 12:30 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > > > Try to do a > > > > .toStream().filter(...).foreach(...) > > > > > > -Matthias > > > > > > On 07/20/2016 08:11 PM, Guozhang Wang wrote: > > > Are you using the 0.10.0.0 release or from trunk? > > > > > > On Wed, Jul 20, 2016 at 10:58 AM, Mathieu Fenniak < > > > mathieu.fenn...@replicon.com> wrote: > > > > > >> Hi Guozhang, > > >> > > >> Yes, I tried to apply the filter on the KTable that came from join, > and > > >> then the foreach on the KTable that came from filter. I was still > > getting > > >> the nulls through to my foreach. > > >> > > >> It is easy to workaround, but, the behaviour was especially surprising > > when > > >> the filter didn't prevent it. > > >> > > >> Mathieu > > >> > > >> > > >> On Wed, Jul 20, 2016 at 11:57 AM, Guozhang Wang <wangg...@gmail.com> > > >> wrote: > > >> > > >>> Hi Mathieu, > > >>> > > >>> As Matthias said, we are working on improving the current join > > semantics: > > >>> > > >>> > > >> > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287 > > >>> > > >>> and will keep you updated. > > >>> > > >>> > > >>> As for KTable.filter(), I think it can actually achieve want you > want: > > >> not > > >>> forwarding nulls to the downstream operators; have you tried it out > but > > >>> find it is not working? > > >>> > > >>> > > >>> Guozhang > > >>> > > >>> > > >>> > > >>> On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak < > > >>> mathieu.fenn...@replicon.com> wrote: > > >>> > > >>>> Hm... OK, I think that makes sense. > > >>>> > > >>>> It seems like I can't filter out those tombstone records; is that > > >>> expected > > >>>> as well? If I throw in a .filter operation before my foreach, its > > >>>> Predicate is not invoked, and the foreach's ForeachAction is invoked > > >>> with a > > >>>> null value still. > > >>>> > > >>>> Mathieu > > >>>> > > >>>> > > >>>> On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax < > > >> matth...@confluent.io> > > >>>> wrote: > > >>>> > > >>>>> Hi Mathieu, > > >>>>> > > >>>>> join semantics are tricky. We are still working on a better > > >>>>> documentation for it... > > >>>>> > > >>>>> For the current state and your question: > > >>>>> > > >>>>> Each time a record is processed, it looks up the other KTable to > see > > >> if > > >>>>> there is a matching record. If non is found, the join result is > empty > > >>>>> and a tombstone record with <key:null> is sent downstream. This > > >>> happens, > > >>>>> to delete any (possible existing) previous join result for this key > > >> -- > > >>>>> keep in mind, that the result is a KTable containing the current > > >> state > > >>>>> of the join. > > >>>>> > > >>>>> This happens both ways, thus, if your first records of each stream > do > > >>>>> not match on the key, both result in a <key:null> message to delete > > >>>>> possible existing join-tuples in the result KTable. > > >>>>> > > >>>>> Does this make sense to you? > > >>>>> > > >>>>> -Matthias > > >>>>> > > >>>>> On 07/20/2016 04:09 PM, Mathieu Fenniak wrote: > > >>>>>> Hello Kafka users, > > >>>>>> > > >>>>>> I'm seeing some unexpected results when using Kafka Streams, and I > > >>> was > > >>>>>> hoping someone could explain them to me. I have two streams, > which > > >>>> I've > > >>>>>> converted KStream->KTable, and then I am joining them together > > >> with a > > >>>>>> "join" (not an outer join, not a full join). With the resulting > > >>> KTable > > >>>>>> from the join, I am performing a foreach. > > >>>>>> > > >>>>>> When I startup my Streams application, my foreach receives two > > >>> records > > >>>>> with > > >>>>>> valid keys but null values *before* my ValueJoiner ever gets > > >>> executed. > > >>>>> Why > > >>>>>> would that be? > > >>>>>> > > >>>>>> Code excerpt; please excuse the use of Kotlin here: > > >>>>>> > > >>>>>> val builder = KStreamBuilder() > > >>>>>> > > >>>>>> val approvalStatus = builder.table( > > >>>>>> Serdes.String(), > > >>>>>> > > >> JsonSerde(TimesheetApprovalStatusChangedMessage::class.java), > > >>>>>> "TimesheetApprovalStatusChanged" > > >>>>>> ) > > >>>>>> > > >>>>>> val timesheetLastApprovalAction = builder.table( > > >>>>>> Serdes.String(), > > >>>>>> JsonSerde(Map::class.java), > > >>>>>> "TimesheetApprovalActionPerformed" > > >>>>>> ) > > >>>>>> > > >>>>>> val timesheetStatus = > > >>> approvalStatus.join(timesheetLastApprovalAction, > > >>>> { > > >>>>>> approvalStatus, lastApprovalAction -> > > >>>>>> println("EXECUTING ValueJoiner") > > >>>>>> computeTimesheetStatus(approvalStatus.approvalStatus!!, > > >>>>> lastApprovalAction > > >>>>>> as Map<String, Any?>) > > >>>>>> }) > > >>>>>> > > >>>>>> timesheetStatus.foreach({ timesheetKey, timesheetStatus -> > > >>>>>> println("EXECUTING ForeachAction: $timesheetKey, status: > > >>>>>> $timesheetStatus") > > >>>>>> if (timesheetStatus == null) { > > >>>>>> println("SKIPPING NULL I DON'T UNDERSTAND") > > >>>>>> } > > >>>>>> }) > > >>>>>> > > >>>>>> > > >>>>>> Resulting console output: > > >>>>>> > > >>>>>> EXECUTING ForeachAction: > > >>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > > >>>>>> status: null > > >>>>>> SKIPPING NULL I DON'T UNDERSTAND > > >>>>>> EXECUTING ForeachAction: > > >>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > > >>>>>> status: null > > >>>>>> SKIPPING NULL I DON'T UNDERSTAND > > >>>>>> EXECUTING ValueJoiner > > >>>>>> EXECUTING ForeachAction: > > >>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > > >>>>>> status: urn:replicon:timesheet-status:submitting > > >>>>>> > > >>>>>> > > >>>>>> Any explanation on why the foreach would be executing for data > that > > >>>>> hasn't > > >>>>>> been generated by my join? > > >>>>>> > > >>>>>> Thanks, > > >>>>>> > > >>>>>> Mathieu > > >>>>>> > > >>>>> > > >>>>> > > >>>> > > >>> > > >>> > > >>> > > >>> -- > > >>> -- Guozhang > > >>> > > >> > > > > > > > > > > > > > > -- -- Guozhang