I think this is a bug of 0.10.0 that we have already fixed in trunk some
time ago.

Guozhang

On Wed, Jul 20, 2016 at 12:25 PM, Mathieu Fenniak <
mathieu.fenn...@replicon.com> wrote:

> I'm using the 0.10.0.0 release.
>
> Matthias's suggestion of using .toStream().filter(...).foreach(...) does
> prevents the nulls from reaching the foreach.  But
> .filter(...).foreach(...) does not; the filter's predicate is not even
> executed before the ForeachAction receives the null records.
>
>
>
> On Wed, Jul 20, 2016 at 12:30 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > Try to do a
> >
> > .toStream().filter(...).foreach(...)
> >
> >
> > -Matthias
> >
> >
> > On 07/20/2016 08:11 PM, Guozhang Wang wrote:
> > > Are you using the 0.10.0.0 release or from trunk?
> > >
> > > On Wed, Jul 20, 2016 at 10:58 AM, Mathieu Fenniak <
> > > mathieu.fenn...@replicon.com> wrote:
> > >
> > >> Hi Guozhang,
> > >>
> > >> Yes, I tried to apply the filter on the KTable that came from join,
> and
> > >> then the foreach on the KTable that came from filter.  I was still
> > getting
> > >> the nulls through to my foreach.
> > >>
> > >> It is easy to workaround, but, the behaviour was especially surprising
> > when
> > >> the filter didn't prevent it.
> > >>
> > >> Mathieu
> > >>
> > >>
> > >> On Wed, Jul 20, 2016 at 11:57 AM, Guozhang Wang <wangg...@gmail.com>
> > >> wrote:
> > >>
> > >>> Hi Mathieu,
> > >>>
> > >>> As Matthias said, we are working on improving the current join
> > semantics:
> > >>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287
> > >>>
> > >>> and will keep you updated.
> > >>>
> > >>>
> > >>> As for KTable.filter(), I think it can actually achieve want you
> want:
> > >> not
> > >>> forwarding nulls to the downstream operators; have you tried it out
> but
> > >>> find it is not working?
> > >>>
> > >>>
> > >>> Guozhang
> > >>>
> > >>>
> > >>>
> > >>> On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak <
> > >>> mathieu.fenn...@replicon.com> wrote:
> > >>>
> > >>>> Hm... OK, I think that makes sense.
> > >>>>
> > >>>> It seems like I can't filter out those tombstone records; is that
> > >>> expected
> > >>>> as well?  If I throw in a .filter operation before my foreach, its
> > >>>> Predicate is not invoked, and the foreach's ForeachAction is invoked
> > >>> with a
> > >>>> null value still.
> > >>>>
> > >>>> Mathieu
> > >>>>
> > >>>>
> > >>>> On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax <
> > >> matth...@confluent.io>
> > >>>> wrote:
> > >>>>
> > >>>>> Hi Mathieu,
> > >>>>>
> > >>>>> join semantics are tricky. We are still working on a better
> > >>>>> documentation for it...
> > >>>>>
> > >>>>> For the current state and your question:
> > >>>>>
> > >>>>> Each time a record is processed, it looks up the other KTable to
> see
> > >> if
> > >>>>> there is a matching record. If non is found, the join result is
> empty
> > >>>>> and a tombstone record with <key:null> is sent downstream. This
> > >>> happens,
> > >>>>> to delete any (possible existing) previous join result for this key
> > >> --
> > >>>>> keep in mind, that the result is a KTable containing the current
> > >> state
> > >>>>> of the join.
> > >>>>>
> > >>>>> This happens both ways, thus, if your first records of each stream
> do
> > >>>>> not match on the key, both result in a <key:null> message to delete
> > >>>>> possible existing join-tuples in the result KTable.
> > >>>>>
> > >>>>> Does this make sense to you?
> > >>>>>
> > >>>>> -Matthias
> > >>>>>
> > >>>>> On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
> > >>>>>> Hello Kafka users,
> > >>>>>>
> > >>>>>> I'm seeing some unexpected results when using Kafka Streams, and I
> > >>> was
> > >>>>>> hoping someone could explain them to me.  I have two streams,
> which
> > >>>> I've
> > >>>>>> converted KStream->KTable, and then I am joining them together
> > >> with a
> > >>>>>> "join" (not an outer join, not a full join).  With the resulting
> > >>> KTable
> > >>>>>> from the join, I am performing a foreach.
> > >>>>>>
> > >>>>>> When I startup my Streams application, my foreach receives two
> > >>> records
> > >>>>> with
> > >>>>>> valid keys but null values *before* my ValueJoiner ever gets
> > >>> executed.
> > >>>>> Why
> > >>>>>> would that be?
> > >>>>>>
> > >>>>>> Code excerpt; please excuse the use of Kotlin here:
> > >>>>>>
> > >>>>>> val builder = KStreamBuilder()
> > >>>>>>
> > >>>>>> val approvalStatus = builder.table(
> > >>>>>>         Serdes.String(),
> > >>>>>>
> > >>  JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
> > >>>>>>         "TimesheetApprovalStatusChanged"
> > >>>>>> )
> > >>>>>>
> > >>>>>> val timesheetLastApprovalAction = builder.table(
> > >>>>>>         Serdes.String(),
> > >>>>>>         JsonSerde(Map::class.java),
> > >>>>>>         "TimesheetApprovalActionPerformed"
> > >>>>>> )
> > >>>>>>
> > >>>>>> val timesheetStatus =
> > >>> approvalStatus.join(timesheetLastApprovalAction,
> > >>>> {
> > >>>>>> approvalStatus, lastApprovalAction ->
> > >>>>>>     println("EXECUTING ValueJoiner")
> > >>>>>>     computeTimesheetStatus(approvalStatus.approvalStatus!!,
> > >>>>> lastApprovalAction
> > >>>>>> as Map<String, Any?>)
> > >>>>>> })
> > >>>>>>
> > >>>>>> timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
> > >>>>>>     println("EXECUTING ForeachAction: $timesheetKey, status:
> > >>>>>> $timesheetStatus")
> > >>>>>>     if (timesheetStatus == null) {
> > >>>>>>         println("SKIPPING NULL I DON'T UNDERSTAND")
> > >>>>>>     }
> > >>>>>> })
> > >>>>>>
> > >>>>>>
> > >>>>>> Resulting console output:
> > >>>>>>
> > >>>>>> EXECUTING ForeachAction:
> > >>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > >>>>>> status: null
> > >>>>>> SKIPPING NULL I DON'T UNDERSTAND
> > >>>>>> EXECUTING ForeachAction:
> > >>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > >>>>>> status: null
> > >>>>>> SKIPPING NULL I DON'T UNDERSTAND
> > >>>>>> EXECUTING ValueJoiner
> > >>>>>> EXECUTING ForeachAction:
> > >>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > >>>>>> status: urn:replicon:timesheet-status:submitting
> > >>>>>>
> > >>>>>>
> > >>>>>> Any explanation on why the foreach would be executing for data
> that
> > >>>>> hasn't
> > >>>>>> been generated by my join?
> > >>>>>>
> > >>>>>> Thanks,
> > >>>>>>
> > >>>>>> Mathieu
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>>
> > >>>
> > >>> --
> > >>> -- Guozhang
> > >>>
> > >>
> > >
> > >
> > >
> >
> >
>



-- 
-- Guozhang

Reply via email to