Re: Kafka Streams: KTable join + filter + foreach

2016-07-20 Thread Guozhang Wang
I think this is a bug of 0.10.0 that we have already fixed in trunk some
time ago.

Guozhang

On Wed, Jul 20, 2016 at 12:25 PM, Mathieu Fenniak <
mathieu.fenn...@replicon.com> wrote:

> I'm using the 0.10.0.0 release.
>
> Matthias's suggestion of using .toStream().filter(...).foreach(...) does
> prevents the nulls from reaching the foreach.  But
> .filter(...).foreach(...) does not; the filter's predicate is not even
> executed before the ForeachAction receives the null records.
>
>
>
> On Wed, Jul 20, 2016 at 12:30 PM, Matthias J. Sax 
> wrote:
>
> > Try to do a
> >
> > .toStream().filter(...).foreach(...)
> >
> >
> > -Matthias
> >
> >
> > On 07/20/2016 08:11 PM, Guozhang Wang wrote:
> > > Are you using the 0.10.0.0 release or from trunk?
> > >
> > > On Wed, Jul 20, 2016 at 10:58 AM, Mathieu Fenniak <
> > > mathieu.fenn...@replicon.com> wrote:
> > >
> > >> Hi Guozhang,
> > >>
> > >> Yes, I tried to apply the filter on the KTable that came from join,
> and
> > >> then the foreach on the KTable that came from filter.  I was still
> > getting
> > >> the nulls through to my foreach.
> > >>
> > >> It is easy to workaround, but, the behaviour was especially surprising
> > when
> > >> the filter didn't prevent it.
> > >>
> > >> Mathieu
> > >>
> > >>
> > >> On Wed, Jul 20, 2016 at 11:57 AM, Guozhang Wang 
> > >> wrote:
> > >>
> > >>> Hi Mathieu,
> > >>>
> > >>> As Matthias said, we are working on improving the current join
> > semantics:
> > >>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287
> > >>>
> > >>> and will keep you updated.
> > >>>
> > >>>
> > >>> As for KTable.filter(), I think it can actually achieve want you
> want:
> > >> not
> > >>> forwarding nulls to the downstream operators; have you tried it out
> but
> > >>> find it is not working?
> > >>>
> > >>>
> > >>> Guozhang
> > >>>
> > >>>
> > >>>
> > >>> On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak <
> > >>> mathieu.fenn...@replicon.com> wrote:
> > >>>
> >  Hm... OK, I think that makes sense.
> > 
> >  It seems like I can't filter out those tombstone records; is that
> > >>> expected
> >  as well?  If I throw in a .filter operation before my foreach, its
> >  Predicate is not invoked, and the foreach's ForeachAction is invoked
> > >>> with a
> >  null value still.
> > 
> >  Mathieu
> > 
> > 
> >  On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax <
> > >> matth...@confluent.io>
> >  wrote:
> > 
> > > Hi Mathieu,
> > >
> > > join semantics are tricky. We are still working on a better
> > > documentation for it...
> > >
> > > For the current state and your question:
> > >
> > > Each time a record is processed, it looks up the other KTable to
> see
> > >> if
> > > there is a matching record. If non is found, the join result is
> empty
> > > and a tombstone record with  is sent downstream. This
> > >>> happens,
> > > to delete any (possible existing) previous join result for this key
> > >> --
> > > keep in mind, that the result is a KTable containing the current
> > >> state
> > > of the join.
> > >
> > > This happens both ways, thus, if your first records of each stream
> do
> > > not match on the key, both result in a  message to delete
> > > possible existing join-tuples in the result KTable.
> > >
> > > Does this make sense to you?
> > >
> > > -Matthias
> > >
> > > On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
> > >> Hello Kafka users,
> > >>
> > >> I'm seeing some unexpected results when using Kafka Streams, and I
> > >>> was
> > >> hoping someone could explain them to me.  I have two streams,
> which
> >  I've
> > >> converted KStream->KTable, and then I am joining them together
> > >> with a
> > >> "join" (not an outer join, not a full join).  With the resulting
> > >>> KTable
> > >> from the join, I am performing a foreach.
> > >>
> > >> When I startup my Streams application, my foreach receives two
> > >>> records
> > > with
> > >> valid keys but null values *before* my ValueJoiner ever gets
> > >>> executed.
> > > Why
> > >> would that be?
> > >>
> > >> Code excerpt; please excuse the use of Kotlin here:
> > >>
> > >> val builder = KStreamBuilder()
> > >>
> > >> val approvalStatus = builder.table(
> > >> Serdes.String(),
> > >>
> > >>  JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
> > >> "TimesheetApprovalStatusChanged"
> > >> )
> > >>
> > >> val timesheetLastApprovalAction = builder.table(
> > >> Serdes.String(),
> > >> JsonSerde(Map::class.java),
> > >> "TimesheetApprovalActionPerformed"
> > >> )
> > >>
> > >> val timesheetStatus =
> > >>> approvalStatus.join(timesheetLastApprovalAction,
> >  {
> > >> approvalStatus, 

Re: Kafka Streams: KTable join + filter + foreach

2016-07-20 Thread Mathieu Fenniak
I'm using the 0.10.0.0 release.

Matthias's suggestion of using .toStream().filter(...).foreach(...) does
prevents the nulls from reaching the foreach.  But
.filter(...).foreach(...) does not; the filter's predicate is not even
executed before the ForeachAction receives the null records.



On Wed, Jul 20, 2016 at 12:30 PM, Matthias J. Sax 
wrote:

> Try to do a
>
> .toStream().filter(...).foreach(...)
>
>
> -Matthias
>
>
> On 07/20/2016 08:11 PM, Guozhang Wang wrote:
> > Are you using the 0.10.0.0 release or from trunk?
> >
> > On Wed, Jul 20, 2016 at 10:58 AM, Mathieu Fenniak <
> > mathieu.fenn...@replicon.com> wrote:
> >
> >> Hi Guozhang,
> >>
> >> Yes, I tried to apply the filter on the KTable that came from join, and
> >> then the foreach on the KTable that came from filter.  I was still
> getting
> >> the nulls through to my foreach.
> >>
> >> It is easy to workaround, but, the behaviour was especially surprising
> when
> >> the filter didn't prevent it.
> >>
> >> Mathieu
> >>
> >>
> >> On Wed, Jul 20, 2016 at 11:57 AM, Guozhang Wang 
> >> wrote:
> >>
> >>> Hi Mathieu,
> >>>
> >>> As Matthias said, we are working on improving the current join
> semantics:
> >>>
> >>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287
> >>>
> >>> and will keep you updated.
> >>>
> >>>
> >>> As for KTable.filter(), I think it can actually achieve want you want:
> >> not
> >>> forwarding nulls to the downstream operators; have you tried it out but
> >>> find it is not working?
> >>>
> >>>
> >>> Guozhang
> >>>
> >>>
> >>>
> >>> On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak <
> >>> mathieu.fenn...@replicon.com> wrote:
> >>>
>  Hm... OK, I think that makes sense.
> 
>  It seems like I can't filter out those tombstone records; is that
> >>> expected
>  as well?  If I throw in a .filter operation before my foreach, its
>  Predicate is not invoked, and the foreach's ForeachAction is invoked
> >>> with a
>  null value still.
> 
>  Mathieu
> 
> 
>  On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax <
> >> matth...@confluent.io>
>  wrote:
> 
> > Hi Mathieu,
> >
> > join semantics are tricky. We are still working on a better
> > documentation for it...
> >
> > For the current state and your question:
> >
> > Each time a record is processed, it looks up the other KTable to see
> >> if
> > there is a matching record. If non is found, the join result is empty
> > and a tombstone record with  is sent downstream. This
> >>> happens,
> > to delete any (possible existing) previous join result for this key
> >> --
> > keep in mind, that the result is a KTable containing the current
> >> state
> > of the join.
> >
> > This happens both ways, thus, if your first records of each stream do
> > not match on the key, both result in a  message to delete
> > possible existing join-tuples in the result KTable.
> >
> > Does this make sense to you?
> >
> > -Matthias
> >
> > On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
> >> Hello Kafka users,
> >>
> >> I'm seeing some unexpected results when using Kafka Streams, and I
> >>> was
> >> hoping someone could explain them to me.  I have two streams, which
>  I've
> >> converted KStream->KTable, and then I am joining them together
> >> with a
> >> "join" (not an outer join, not a full join).  With the resulting
> >>> KTable
> >> from the join, I am performing a foreach.
> >>
> >> When I startup my Streams application, my foreach receives two
> >>> records
> > with
> >> valid keys but null values *before* my ValueJoiner ever gets
> >>> executed.
> > Why
> >> would that be?
> >>
> >> Code excerpt; please excuse the use of Kotlin here:
> >>
> >> val builder = KStreamBuilder()
> >>
> >> val approvalStatus = builder.table(
> >> Serdes.String(),
> >>
> >>  JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
> >> "TimesheetApprovalStatusChanged"
> >> )
> >>
> >> val timesheetLastApprovalAction = builder.table(
> >> Serdes.String(),
> >> JsonSerde(Map::class.java),
> >> "TimesheetApprovalActionPerformed"
> >> )
> >>
> >> val timesheetStatus =
> >>> approvalStatus.join(timesheetLastApprovalAction,
>  {
> >> approvalStatus, lastApprovalAction ->
> >> println("EXECUTING ValueJoiner")
> >> computeTimesheetStatus(approvalStatus.approvalStatus!!,
> > lastApprovalAction
> >> as Map)
> >> })
> >>
> >> timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
> >> println("EXECUTING ForeachAction: $timesheetKey, status:
> >> $timesheetStatus")
> >> if (timesheetStatus == null) {
> >> println("SKIPPING NULL I DON'T UNDERSTAND")
> >> }

Re: Kafka Streams: KTable join + filter + foreach

2016-07-20 Thread Matthias J. Sax
Try to do a

.toStream().filter(...).foreach(...)


-Matthias


On 07/20/2016 08:11 PM, Guozhang Wang wrote:
> Are you using the 0.10.0.0 release or from trunk?
> 
> On Wed, Jul 20, 2016 at 10:58 AM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
> 
>> Hi Guozhang,
>>
>> Yes, I tried to apply the filter on the KTable that came from join, and
>> then the foreach on the KTable that came from filter.  I was still getting
>> the nulls through to my foreach.
>>
>> It is easy to workaround, but, the behaviour was especially surprising when
>> the filter didn't prevent it.
>>
>> Mathieu
>>
>>
>> On Wed, Jul 20, 2016 at 11:57 AM, Guozhang Wang 
>> wrote:
>>
>>> Hi Mathieu,
>>>
>>> As Matthias said, we are working on improving the current join semantics:
>>>
>>>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287
>>>
>>> and will keep you updated.
>>>
>>>
>>> As for KTable.filter(), I think it can actually achieve want you want:
>> not
>>> forwarding nulls to the downstream operators; have you tried it out but
>>> find it is not working?
>>>
>>>
>>> Guozhang
>>>
>>>
>>>
>>> On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak <
>>> mathieu.fenn...@replicon.com> wrote:
>>>
 Hm... OK, I think that makes sense.

 It seems like I can't filter out those tombstone records; is that
>>> expected
 as well?  If I throw in a .filter operation before my foreach, its
 Predicate is not invoked, and the foreach's ForeachAction is invoked
>>> with a
 null value still.

 Mathieu


 On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax <
>> matth...@confluent.io>
 wrote:

> Hi Mathieu,
>
> join semantics are tricky. We are still working on a better
> documentation for it...
>
> For the current state and your question:
>
> Each time a record is processed, it looks up the other KTable to see
>> if
> there is a matching record. If non is found, the join result is empty
> and a tombstone record with  is sent downstream. This
>>> happens,
> to delete any (possible existing) previous join result for this key
>> --
> keep in mind, that the result is a KTable containing the current
>> state
> of the join.
>
> This happens both ways, thus, if your first records of each stream do
> not match on the key, both result in a  message to delete
> possible existing join-tuples in the result KTable.
>
> Does this make sense to you?
>
> -Matthias
>
> On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
>> Hello Kafka users,
>>
>> I'm seeing some unexpected results when using Kafka Streams, and I
>>> was
>> hoping someone could explain them to me.  I have two streams, which
 I've
>> converted KStream->KTable, and then I am joining them together
>> with a
>> "join" (not an outer join, not a full join).  With the resulting
>>> KTable
>> from the join, I am performing a foreach.
>>
>> When I startup my Streams application, my foreach receives two
>>> records
> with
>> valid keys but null values *before* my ValueJoiner ever gets
>>> executed.
> Why
>> would that be?
>>
>> Code excerpt; please excuse the use of Kotlin here:
>>
>> val builder = KStreamBuilder()
>>
>> val approvalStatus = builder.table(
>> Serdes.String(),
>>
>>  JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
>> "TimesheetApprovalStatusChanged"
>> )
>>
>> val timesheetLastApprovalAction = builder.table(
>> Serdes.String(),
>> JsonSerde(Map::class.java),
>> "TimesheetApprovalActionPerformed"
>> )
>>
>> val timesheetStatus =
>>> approvalStatus.join(timesheetLastApprovalAction,
 {
>> approvalStatus, lastApprovalAction ->
>> println("EXECUTING ValueJoiner")
>> computeTimesheetStatus(approvalStatus.approvalStatus!!,
> lastApprovalAction
>> as Map)
>> })
>>
>> timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
>> println("EXECUTING ForeachAction: $timesheetKey, status:
>> $timesheetStatus")
>> if (timesheetStatus == null) {
>> println("SKIPPING NULL I DON'T UNDERSTAND")
>> }
>> })
>>
>>
>> Resulting console output:
>>
>> EXECUTING ForeachAction:
>>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
>> status: null
>> SKIPPING NULL I DON'T UNDERSTAND
>> EXECUTING ForeachAction:
>>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
>> status: null
>> SKIPPING NULL I DON'T UNDERSTAND
>> EXECUTING ValueJoiner
>> EXECUTING ForeachAction:
>>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
>> status: urn:replicon:timesheet-status:submitting
>>
>>
>> Any explanation on why the foreach would be executing for data that
> hasn't
>> been generated by my join?
>>
>> 

Re: Kafka Streams: KTable join + filter + foreach

2016-07-20 Thread Guozhang Wang
Are you using the 0.10.0.0 release or from trunk?

On Wed, Jul 20, 2016 at 10:58 AM, Mathieu Fenniak <
mathieu.fenn...@replicon.com> wrote:

> Hi Guozhang,
>
> Yes, I tried to apply the filter on the KTable that came from join, and
> then the foreach on the KTable that came from filter.  I was still getting
> the nulls through to my foreach.
>
> It is easy to workaround, but, the behaviour was especially surprising when
> the filter didn't prevent it.
>
> Mathieu
>
>
> On Wed, Jul 20, 2016 at 11:57 AM, Guozhang Wang 
> wrote:
>
> > Hi Mathieu,
> >
> > As Matthias said, we are working on improving the current join semantics:
> >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287
> >
> > and will keep you updated.
> >
> >
> > As for KTable.filter(), I think it can actually achieve want you want:
> not
> > forwarding nulls to the downstream operators; have you tried it out but
> > find it is not working?
> >
> >
> > Guozhang
> >
> >
> >
> > On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak <
> > mathieu.fenn...@replicon.com> wrote:
> >
> > > Hm... OK, I think that makes sense.
> > >
> > > It seems like I can't filter out those tombstone records; is that
> > expected
> > > as well?  If I throw in a .filter operation before my foreach, its
> > > Predicate is not invoked, and the foreach's ForeachAction is invoked
> > with a
> > > null value still.
> > >
> > > Mathieu
> > >
> > >
> > > On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > > > Hi Mathieu,
> > > >
> > > > join semantics are tricky. We are still working on a better
> > > > documentation for it...
> > > >
> > > > For the current state and your question:
> > > >
> > > > Each time a record is processed, it looks up the other KTable to see
> if
> > > > there is a matching record. If non is found, the join result is empty
> > > > and a tombstone record with  is sent downstream. This
> > happens,
> > > > to delete any (possible existing) previous join result for this key
> --
> > > > keep in mind, that the result is a KTable containing the current
> state
> > > > of the join.
> > > >
> > > > This happens both ways, thus, if your first records of each stream do
> > > > not match on the key, both result in a  message to delete
> > > > possible existing join-tuples in the result KTable.
> > > >
> > > > Does this make sense to you?
> > > >
> > > > -Matthias
> > > >
> > > > On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
> > > > > Hello Kafka users,
> > > > >
> > > > > I'm seeing some unexpected results when using Kafka Streams, and I
> > was
> > > > > hoping someone could explain them to me.  I have two streams, which
> > > I've
> > > > > converted KStream->KTable, and then I am joining them together
> with a
> > > > > "join" (not an outer join, not a full join).  With the resulting
> > KTable
> > > > > from the join, I am performing a foreach.
> > > > >
> > > > > When I startup my Streams application, my foreach receives two
> > records
> > > > with
> > > > > valid keys but null values *before* my ValueJoiner ever gets
> > executed.
> > > > Why
> > > > > would that be?
> > > > >
> > > > > Code excerpt; please excuse the use of Kotlin here:
> > > > >
> > > > > val builder = KStreamBuilder()
> > > > >
> > > > > val approvalStatus = builder.table(
> > > > > Serdes.String(),
> > > > >
>  JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
> > > > > "TimesheetApprovalStatusChanged"
> > > > > )
> > > > >
> > > > > val timesheetLastApprovalAction = builder.table(
> > > > > Serdes.String(),
> > > > > JsonSerde(Map::class.java),
> > > > > "TimesheetApprovalActionPerformed"
> > > > > )
> > > > >
> > > > > val timesheetStatus =
> > approvalStatus.join(timesheetLastApprovalAction,
> > > {
> > > > > approvalStatus, lastApprovalAction ->
> > > > > println("EXECUTING ValueJoiner")
> > > > > computeTimesheetStatus(approvalStatus.approvalStatus!!,
> > > > lastApprovalAction
> > > > > as Map)
> > > > > })
> > > > >
> > > > > timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
> > > > > println("EXECUTING ForeachAction: $timesheetKey, status:
> > > > > $timesheetStatus")
> > > > > if (timesheetStatus == null) {
> > > > > println("SKIPPING NULL I DON'T UNDERSTAND")
> > > > > }
> > > > > })
> > > > >
> > > > >
> > > > > Resulting console output:
> > > > >
> > > > > EXECUTING ForeachAction:
> > mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > > > status: null
> > > > > SKIPPING NULL I DON'T UNDERSTAND
> > > > > EXECUTING ForeachAction:
> > mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > > > status: null
> > > > > SKIPPING NULL I DON'T UNDERSTAND
> > > > > EXECUTING ValueJoiner
> > > > > EXECUTING ForeachAction:
> > mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > > > status: urn:replicon:timesheet-status:submitting
> > > > >
> > > > >
> > > > > Any explanation on why the 

Re: Kafka Streams: KTable join + filter + foreach

2016-07-20 Thread Mathieu Fenniak
Hi Guozhang,

Yes, I tried to apply the filter on the KTable that came from join, and
then the foreach on the KTable that came from filter.  I was still getting
the nulls through to my foreach.

It is easy to workaround, but, the behaviour was especially surprising when
the filter didn't prevent it.

Mathieu


On Wed, Jul 20, 2016 at 11:57 AM, Guozhang Wang  wrote:

> Hi Mathieu,
>
> As Matthias said, we are working on improving the current join semantics:
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287
>
> and will keep you updated.
>
>
> As for KTable.filter(), I think it can actually achieve want you want: not
> forwarding nulls to the downstream operators; have you tried it out but
> find it is not working?
>
>
> Guozhang
>
>
>
> On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
>
> > Hm... OK, I think that makes sense.
> >
> > It seems like I can't filter out those tombstone records; is that
> expected
> > as well?  If I throw in a .filter operation before my foreach, its
> > Predicate is not invoked, and the foreach's ForeachAction is invoked
> with a
> > null value still.
> >
> > Mathieu
> >
> >
> > On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax 
> > wrote:
> >
> > > Hi Mathieu,
> > >
> > > join semantics are tricky. We are still working on a better
> > > documentation for it...
> > >
> > > For the current state and your question:
> > >
> > > Each time a record is processed, it looks up the other KTable to see if
> > > there is a matching record. If non is found, the join result is empty
> > > and a tombstone record with  is sent downstream. This
> happens,
> > > to delete any (possible existing) previous join result for this key --
> > > keep in mind, that the result is a KTable containing the current state
> > > of the join.
> > >
> > > This happens both ways, thus, if your first records of each stream do
> > > not match on the key, both result in a  message to delete
> > > possible existing join-tuples in the result KTable.
> > >
> > > Does this make sense to you?
> > >
> > > -Matthias
> > >
> > > On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
> > > > Hello Kafka users,
> > > >
> > > > I'm seeing some unexpected results when using Kafka Streams, and I
> was
> > > > hoping someone could explain them to me.  I have two streams, which
> > I've
> > > > converted KStream->KTable, and then I am joining them together with a
> > > > "join" (not an outer join, not a full join).  With the resulting
> KTable
> > > > from the join, I am performing a foreach.
> > > >
> > > > When I startup my Streams application, my foreach receives two
> records
> > > with
> > > > valid keys but null values *before* my ValueJoiner ever gets
> executed.
> > > Why
> > > > would that be?
> > > >
> > > > Code excerpt; please excuse the use of Kotlin here:
> > > >
> > > > val builder = KStreamBuilder()
> > > >
> > > > val approvalStatus = builder.table(
> > > > Serdes.String(),
> > > > JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
> > > > "TimesheetApprovalStatusChanged"
> > > > )
> > > >
> > > > val timesheetLastApprovalAction = builder.table(
> > > > Serdes.String(),
> > > > JsonSerde(Map::class.java),
> > > > "TimesheetApprovalActionPerformed"
> > > > )
> > > >
> > > > val timesheetStatus =
> approvalStatus.join(timesheetLastApprovalAction,
> > {
> > > > approvalStatus, lastApprovalAction ->
> > > > println("EXECUTING ValueJoiner")
> > > > computeTimesheetStatus(approvalStatus.approvalStatus!!,
> > > lastApprovalAction
> > > > as Map)
> > > > })
> > > >
> > > > timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
> > > > println("EXECUTING ForeachAction: $timesheetKey, status:
> > > > $timesheetStatus")
> > > > if (timesheetStatus == null) {
> > > > println("SKIPPING NULL I DON'T UNDERSTAND")
> > > > }
> > > > })
> > > >
> > > >
> > > > Resulting console output:
> > > >
> > > > EXECUTING ForeachAction:
> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > > status: null
> > > > SKIPPING NULL I DON'T UNDERSTAND
> > > > EXECUTING ForeachAction:
> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > > status: null
> > > > SKIPPING NULL I DON'T UNDERSTAND
> > > > EXECUTING ValueJoiner
> > > > EXECUTING ForeachAction:
> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > > status: urn:replicon:timesheet-status:submitting
> > > >
> > > >
> > > > Any explanation on why the foreach would be executing for data that
> > > hasn't
> > > > been generated by my join?
> > > >
> > > > Thanks,
> > > >
> > > > Mathieu
> > > >
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: Kafka Streams: KTable join + filter + foreach

2016-07-20 Thread Guozhang Wang
Hi Mathieu,

As Matthias said, we are working on improving the current join semantics:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287

and will keep you updated.


As for KTable.filter(), I think it can actually achieve want you want: not
forwarding nulls to the downstream operators; have you tried it out but
find it is not working?


Guozhang



On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak <
mathieu.fenn...@replicon.com> wrote:

> Hm... OK, I think that makes sense.
>
> It seems like I can't filter out those tombstone records; is that expected
> as well?  If I throw in a .filter operation before my foreach, its
> Predicate is not invoked, and the foreach's ForeachAction is invoked with a
> null value still.
>
> Mathieu
>
>
> On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax 
> wrote:
>
> > Hi Mathieu,
> >
> > join semantics are tricky. We are still working on a better
> > documentation for it...
> >
> > For the current state and your question:
> >
> > Each time a record is processed, it looks up the other KTable to see if
> > there is a matching record. If non is found, the join result is empty
> > and a tombstone record with  is sent downstream. This happens,
> > to delete any (possible existing) previous join result for this key --
> > keep in mind, that the result is a KTable containing the current state
> > of the join.
> >
> > This happens both ways, thus, if your first records of each stream do
> > not match on the key, both result in a  message to delete
> > possible existing join-tuples in the result KTable.
> >
> > Does this make sense to you?
> >
> > -Matthias
> >
> > On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
> > > Hello Kafka users,
> > >
> > > I'm seeing some unexpected results when using Kafka Streams, and I was
> > > hoping someone could explain them to me.  I have two streams, which
> I've
> > > converted KStream->KTable, and then I am joining them together with a
> > > "join" (not an outer join, not a full join).  With the resulting KTable
> > > from the join, I am performing a foreach.
> > >
> > > When I startup my Streams application, my foreach receives two records
> > with
> > > valid keys but null values *before* my ValueJoiner ever gets executed.
> > Why
> > > would that be?
> > >
> > > Code excerpt; please excuse the use of Kotlin here:
> > >
> > > val builder = KStreamBuilder()
> > >
> > > val approvalStatus = builder.table(
> > > Serdes.String(),
> > > JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
> > > "TimesheetApprovalStatusChanged"
> > > )
> > >
> > > val timesheetLastApprovalAction = builder.table(
> > > Serdes.String(),
> > > JsonSerde(Map::class.java),
> > > "TimesheetApprovalActionPerformed"
> > > )
> > >
> > > val timesheetStatus = approvalStatus.join(timesheetLastApprovalAction,
> {
> > > approvalStatus, lastApprovalAction ->
> > > println("EXECUTING ValueJoiner")
> > > computeTimesheetStatus(approvalStatus.approvalStatus!!,
> > lastApprovalAction
> > > as Map)
> > > })
> > >
> > > timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
> > > println("EXECUTING ForeachAction: $timesheetKey, status:
> > > $timesheetStatus")
> > > if (timesheetStatus == null) {
> > > println("SKIPPING NULL I DON'T UNDERSTAND")
> > > }
> > > })
> > >
> > >
> > > Resulting console output:
> > >
> > > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > status: null
> > > SKIPPING NULL I DON'T UNDERSTAND
> > > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > status: null
> > > SKIPPING NULL I DON'T UNDERSTAND
> > > EXECUTING ValueJoiner
> > > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > status: urn:replicon:timesheet-status:submitting
> > >
> > >
> > > Any explanation on why the foreach would be executing for data that
> > hasn't
> > > been generated by my join?
> > >
> > > Thanks,
> > >
> > > Mathieu
> > >
> >
> >
>



-- 
-- Guozhang


Re: Kafka Streams: KTable join + filter + foreach

2016-07-20 Thread Mathieu Fenniak
Hm... OK, I think that makes sense.

It seems like I can't filter out those tombstone records; is that expected
as well?  If I throw in a .filter operation before my foreach, its
Predicate is not invoked, and the foreach's ForeachAction is invoked with a
null value still.

Mathieu


On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax 
wrote:

> Hi Mathieu,
>
> join semantics are tricky. We are still working on a better
> documentation for it...
>
> For the current state and your question:
>
> Each time a record is processed, it looks up the other KTable to see if
> there is a matching record. If non is found, the join result is empty
> and a tombstone record with  is sent downstream. This happens,
> to delete any (possible existing) previous join result for this key --
> keep in mind, that the result is a KTable containing the current state
> of the join.
>
> This happens both ways, thus, if your first records of each stream do
> not match on the key, both result in a  message to delete
> possible existing join-tuples in the result KTable.
>
> Does this make sense to you?
>
> -Matthias
>
> On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
> > Hello Kafka users,
> >
> > I'm seeing some unexpected results when using Kafka Streams, and I was
> > hoping someone could explain them to me.  I have two streams, which I've
> > converted KStream->KTable, and then I am joining them together with a
> > "join" (not an outer join, not a full join).  With the resulting KTable
> > from the join, I am performing a foreach.
> >
> > When I startup my Streams application, my foreach receives two records
> with
> > valid keys but null values *before* my ValueJoiner ever gets executed.
> Why
> > would that be?
> >
> > Code excerpt; please excuse the use of Kotlin here:
> >
> > val builder = KStreamBuilder()
> >
> > val approvalStatus = builder.table(
> > Serdes.String(),
> > JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
> > "TimesheetApprovalStatusChanged"
> > )
> >
> > val timesheetLastApprovalAction = builder.table(
> > Serdes.String(),
> > JsonSerde(Map::class.java),
> > "TimesheetApprovalActionPerformed"
> > )
> >
> > val timesheetStatus = approvalStatus.join(timesheetLastApprovalAction, {
> > approvalStatus, lastApprovalAction ->
> > println("EXECUTING ValueJoiner")
> > computeTimesheetStatus(approvalStatus.approvalStatus!!,
> lastApprovalAction
> > as Map)
> > })
> >
> > timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
> > println("EXECUTING ForeachAction: $timesheetKey, status:
> > $timesheetStatus")
> > if (timesheetStatus == null) {
> > println("SKIPPING NULL I DON'T UNDERSTAND")
> > }
> > })
> >
> >
> > Resulting console output:
> >
> > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > status: null
> > SKIPPING NULL I DON'T UNDERSTAND
> > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > status: null
> > SKIPPING NULL I DON'T UNDERSTAND
> > EXECUTING ValueJoiner
> > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > status: urn:replicon:timesheet-status:submitting
> >
> >
> > Any explanation on why the foreach would be executing for data that
> hasn't
> > been generated by my join?
> >
> > Thanks,
> >
> > Mathieu
> >
>
>


Re: Kafka Streams: KTable join + filter + foreach

2016-07-20 Thread Matthias J. Sax
Hi Mathieu,

join semantics are tricky. We are still working on a better
documentation for it...

For the current state and your question:

Each time a record is processed, it looks up the other KTable to see if
there is a matching record. If non is found, the join result is empty
and a tombstone record with  is sent downstream. This happens,
to delete any (possible existing) previous join result for this key --
keep in mind, that the result is a KTable containing the current state
of the join.

This happens both ways, thus, if your first records of each stream do
not match on the key, both result in a  message to delete
possible existing join-tuples in the result KTable.

Does this make sense to you?

-Matthias

On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
> Hello Kafka users,
> 
> I'm seeing some unexpected results when using Kafka Streams, and I was
> hoping someone could explain them to me.  I have two streams, which I've
> converted KStream->KTable, and then I am joining them together with a
> "join" (not an outer join, not a full join).  With the resulting KTable
> from the join, I am performing a foreach.
> 
> When I startup my Streams application, my foreach receives two records with
> valid keys but null values *before* my ValueJoiner ever gets executed.  Why
> would that be?
> 
> Code excerpt; please excuse the use of Kotlin here:
> 
> val builder = KStreamBuilder()
> 
> val approvalStatus = builder.table(
> Serdes.String(),
> JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
> "TimesheetApprovalStatusChanged"
> )
> 
> val timesheetLastApprovalAction = builder.table(
> Serdes.String(),
> JsonSerde(Map::class.java),
> "TimesheetApprovalActionPerformed"
> )
> 
> val timesheetStatus = approvalStatus.join(timesheetLastApprovalAction, {
> approvalStatus, lastApprovalAction ->
> println("EXECUTING ValueJoiner")
> computeTimesheetStatus(approvalStatus.approvalStatus!!, lastApprovalAction
> as Map)
> })
> 
> timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
> println("EXECUTING ForeachAction: $timesheetKey, status:
> $timesheetStatus")
> if (timesheetStatus == null) {
> println("SKIPPING NULL I DON'T UNDERSTAND")
> }
> })
> 
> 
> Resulting console output:
> 
> EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> status: null
> SKIPPING NULL I DON'T UNDERSTAND
> EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> status: null
> SKIPPING NULL I DON'T UNDERSTAND
> EXECUTING ValueJoiner
> EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> status: urn:replicon:timesheet-status:submitting
> 
> 
> Any explanation on why the foreach would be executing for data that hasn't
> been generated by my join?
> 
> Thanks,
> 
> Mathieu
> 



signature.asc
Description: OpenPGP digital signature