Are you using the 0.10.0.0 release or from trunk?

On Wed, Jul 20, 2016 at 10:58 AM, Mathieu Fenniak <
mathieu.fenn...@replicon.com> wrote:

> Hi Guozhang,
>
> Yes, I tried to apply the filter on the KTable that came from join, and
> then the foreach on the KTable that came from filter.  I was still getting
> the nulls through to my foreach.
>
> It is easy to workaround, but, the behaviour was especially surprising when
> the filter didn't prevent it.
>
> Mathieu
>
>
> On Wed, Jul 20, 2016 at 11:57 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
>
> > Hi Mathieu,
> >
> > As Matthias said, we are working on improving the current join semantics:
> >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287
> >
> > and will keep you updated.
> >
> >
> > As for KTable.filter(), I think it can actually achieve want you want:
> not
> > forwarding nulls to the downstream operators; have you tried it out but
> > find it is not working?
> >
> >
> > Guozhang
> >
> >
> >
> > On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak <
> > mathieu.fenn...@replicon.com> wrote:
> >
> > > Hm... OK, I think that makes sense.
> > >
> > > It seems like I can't filter out those tombstone records; is that
> > expected
> > > as well?  If I throw in a .filter operation before my foreach, its
> > > Predicate is not invoked, and the foreach's ForeachAction is invoked
> > with a
> > > null value still.
> > >
> > > Mathieu
> > >
> > >
> > > On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > > > Hi Mathieu,
> > > >
> > > > join semantics are tricky. We are still working on a better
> > > > documentation for it...
> > > >
> > > > For the current state and your question:
> > > >
> > > > Each time a record is processed, it looks up the other KTable to see
> if
> > > > there is a matching record. If non is found, the join result is empty
> > > > and a tombstone record with <key:null> is sent downstream. This
> > happens,
> > > > to delete any (possible existing) previous join result for this key
> --
> > > > keep in mind, that the result is a KTable containing the current
> state
> > > > of the join.
> > > >
> > > > This happens both ways, thus, if your first records of each stream do
> > > > not match on the key, both result in a <key:null> message to delete
> > > > possible existing join-tuples in the result KTable.
> > > >
> > > > Does this make sense to you?
> > > >
> > > > -Matthias
> > > >
> > > > On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
> > > > > Hello Kafka users,
> > > > >
> > > > > I'm seeing some unexpected results when using Kafka Streams, and I
> > was
> > > > > hoping someone could explain them to me.  I have two streams, which
> > > I've
> > > > > converted KStream->KTable, and then I am joining them together
> with a
> > > > > "join" (not an outer join, not a full join).  With the resulting
> > KTable
> > > > > from the join, I am performing a foreach.
> > > > >
> > > > > When I startup my Streams application, my foreach receives two
> > records
> > > > with
> > > > > valid keys but null values *before* my ValueJoiner ever gets
> > executed.
> > > > Why
> > > > > would that be?
> > > > >
> > > > > Code excerpt; please excuse the use of Kotlin here:
> > > > >
> > > > > val builder = KStreamBuilder()
> > > > >
> > > > > val approvalStatus = builder.table(
> > > > >         Serdes.String(),
> > > > >
>  JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
> > > > >         "TimesheetApprovalStatusChanged"
> > > > > )
> > > > >
> > > > > val timesheetLastApprovalAction = builder.table(
> > > > >         Serdes.String(),
> > > > >         JsonSerde(Map::class.java),
> > > > >         "TimesheetApprovalActionPerformed"
> > > > > )
> > > > >
> > > > > val timesheetStatus =
> > approvalStatus.join(timesheetLastApprovalAction,
> > > {
> > > > > approvalStatus, lastApprovalAction ->
> > > > >     println("EXECUTING ValueJoiner")
> > > > >     computeTimesheetStatus(approvalStatus.approvalStatus!!,
> > > > lastApprovalAction
> > > > > as Map<String, Any?>)
> > > > > })
> > > > >
> > > > > timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
> > > > >     println("EXECUTING ForeachAction: $timesheetKey, status:
> > > > > $timesheetStatus")
> > > > >     if (timesheetStatus == null) {
> > > > >         println("SKIPPING NULL I DON'T UNDERSTAND")
> > > > >     }
> > > > > })
> > > > >
> > > > >
> > > > > Resulting console output:
> > > > >
> > > > > EXECUTING ForeachAction:
> > mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > > > status: null
> > > > > SKIPPING NULL I DON'T UNDERSTAND
> > > > > EXECUTING ForeachAction:
> > mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > > > status: null
> > > > > SKIPPING NULL I DON'T UNDERSTAND
> > > > > EXECUTING ValueJoiner
> > > > > EXECUTING ForeachAction:
> > mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > > > status: urn:replicon:timesheet-status:submitting
> > > > >
> > > > >
> > > > > Any explanation on why the foreach would be executing for data that
> > > > hasn't
> > > > > been generated by my join?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Mathieu
> > > > >
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to