Try to do a .toStream().filter(...).foreach(...)
-Matthias On 07/20/2016 08:11 PM, Guozhang Wang wrote: > Are you using the 0.10.0.0 release or from trunk? > > On Wed, Jul 20, 2016 at 10:58 AM, Mathieu Fenniak < > mathieu.fenn...@replicon.com> wrote: > >> Hi Guozhang, >> >> Yes, I tried to apply the filter on the KTable that came from join, and >> then the foreach on the KTable that came from filter. I was still getting >> the nulls through to my foreach. >> >> It is easy to workaround, but, the behaviour was especially surprising when >> the filter didn't prevent it. >> >> Mathieu >> >> >> On Wed, Jul 20, 2016 at 11:57 AM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> >>> Hi Mathieu, >>> >>> As Matthias said, we are working on improving the current join semantics: >>> >>> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287 >>> >>> and will keep you updated. >>> >>> >>> As for KTable.filter(), I think it can actually achieve want you want: >> not >>> forwarding nulls to the downstream operators; have you tried it out but >>> find it is not working? >>> >>> >>> Guozhang >>> >>> >>> >>> On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak < >>> mathieu.fenn...@replicon.com> wrote: >>> >>>> Hm... OK, I think that makes sense. >>>> >>>> It seems like I can't filter out those tombstone records; is that >>> expected >>>> as well? If I throw in a .filter operation before my foreach, its >>>> Predicate is not invoked, and the foreach's ForeachAction is invoked >>> with a >>>> null value still. >>>> >>>> Mathieu >>>> >>>> >>>> On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax < >> matth...@confluent.io> >>>> wrote: >>>> >>>>> Hi Mathieu, >>>>> >>>>> join semantics are tricky. We are still working on a better >>>>> documentation for it... >>>>> >>>>> For the current state and your question: >>>>> >>>>> Each time a record is processed, it looks up the other KTable to see >> if >>>>> there is a matching record. If non is found, the join result is empty >>>>> and a tombstone record with <key:null> is sent downstream. This >>> happens, >>>>> to delete any (possible existing) previous join result for this key >> -- >>>>> keep in mind, that the result is a KTable containing the current >> state >>>>> of the join. >>>>> >>>>> This happens both ways, thus, if your first records of each stream do >>>>> not match on the key, both result in a <key:null> message to delete >>>>> possible existing join-tuples in the result KTable. >>>>> >>>>> Does this make sense to you? >>>>> >>>>> -Matthias >>>>> >>>>> On 07/20/2016 04:09 PM, Mathieu Fenniak wrote: >>>>>> Hello Kafka users, >>>>>> >>>>>> I'm seeing some unexpected results when using Kafka Streams, and I >>> was >>>>>> hoping someone could explain them to me. I have two streams, which >>>> I've >>>>>> converted KStream->KTable, and then I am joining them together >> with a >>>>>> "join" (not an outer join, not a full join). With the resulting >>> KTable >>>>>> from the join, I am performing a foreach. >>>>>> >>>>>> When I startup my Streams application, my foreach receives two >>> records >>>>> with >>>>>> valid keys but null values *before* my ValueJoiner ever gets >>> executed. >>>>> Why >>>>>> would that be? >>>>>> >>>>>> Code excerpt; please excuse the use of Kotlin here: >>>>>> >>>>>> val builder = KStreamBuilder() >>>>>> >>>>>> val approvalStatus = builder.table( >>>>>> Serdes.String(), >>>>>> >> JsonSerde(TimesheetApprovalStatusChangedMessage::class.java), >>>>>> "TimesheetApprovalStatusChanged" >>>>>> ) >>>>>> >>>>>> val timesheetLastApprovalAction = builder.table( >>>>>> Serdes.String(), >>>>>> JsonSerde(Map::class.java), >>>>>> "TimesheetApprovalActionPerformed" >>>>>> ) >>>>>> >>>>>> val timesheetStatus = >>> approvalStatus.join(timesheetLastApprovalAction, >>>> { >>>>>> approvalStatus, lastApprovalAction -> >>>>>> println("EXECUTING ValueJoiner") >>>>>> computeTimesheetStatus(approvalStatus.approvalStatus!!, >>>>> lastApprovalAction >>>>>> as Map<String, Any?>) >>>>>> }) >>>>>> >>>>>> timesheetStatus.foreach({ timesheetKey, timesheetStatus -> >>>>>> println("EXECUTING ForeachAction: $timesheetKey, status: >>>>>> $timesheetStatus") >>>>>> if (timesheetStatus == null) { >>>>>> println("SKIPPING NULL I DON'T UNDERSTAND") >>>>>> } >>>>>> }) >>>>>> >>>>>> >>>>>> Resulting console output: >>>>>> >>>>>> EXECUTING ForeachAction: >>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, >>>>>> status: null >>>>>> SKIPPING NULL I DON'T UNDERSTAND >>>>>> EXECUTING ForeachAction: >>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, >>>>>> status: null >>>>>> SKIPPING NULL I DON'T UNDERSTAND >>>>>> EXECUTING ValueJoiner >>>>>> EXECUTING ForeachAction: >>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, >>>>>> status: urn:replicon:timesheet-status:submitting >>>>>> >>>>>> >>>>>> Any explanation on why the foreach would be executing for data that >>>>> hasn't >>>>>> been generated by my join? >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Mathieu >>>>>> >>>>> >>>>> >>>> >>> >>> >>> >>> -- >>> -- Guozhang >>> >> > > >
signature.asc
Description: OpenPGP digital signature