Re: Kafka Streams: KTable join + filter + foreach
I think this is a bug of 0.10.0 that we have already fixed in trunk some time ago. Guozhang On Wed, Jul 20, 2016 at 12:25 PM, Mathieu Fenniak < mathieu.fenn...@replicon.com> wrote: > I'm using the 0.10.0.0 release. > > Matthias's suggestion of using .toStream().filter(...).foreach(...) does > prevents the nulls from reaching the foreach. But > .filter(...).foreach(...) does not; the filter's predicate is not even > executed before the ForeachAction receives the null records. > > > > On Wed, Jul 20, 2016 at 12:30 PM, Matthias J. Sax> wrote: > > > Try to do a > > > > .toStream().filter(...).foreach(...) > > > > > > -Matthias > > > > > > On 07/20/2016 08:11 PM, Guozhang Wang wrote: > > > Are you using the 0.10.0.0 release or from trunk? > > > > > > On Wed, Jul 20, 2016 at 10:58 AM, Mathieu Fenniak < > > > mathieu.fenn...@replicon.com> wrote: > > > > > >> Hi Guozhang, > > >> > > >> Yes, I tried to apply the filter on the KTable that came from join, > and > > >> then the foreach on the KTable that came from filter. I was still > > getting > > >> the nulls through to my foreach. > > >> > > >> It is easy to workaround, but, the behaviour was especially surprising > > when > > >> the filter didn't prevent it. > > >> > > >> Mathieu > > >> > > >> > > >> On Wed, Jul 20, 2016 at 11:57 AM, Guozhang Wang > > >> wrote: > > >> > > >>> Hi Mathieu, > > >>> > > >>> As Matthias said, we are working on improving the current join > > semantics: > > >>> > > >>> > > >> > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287 > > >>> > > >>> and will keep you updated. > > >>> > > >>> > > >>> As for KTable.filter(), I think it can actually achieve want you > want: > > >> not > > >>> forwarding nulls to the downstream operators; have you tried it out > but > > >>> find it is not working? > > >>> > > >>> > > >>> Guozhang > > >>> > > >>> > > >>> > > >>> On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak < > > >>> mathieu.fenn...@replicon.com> wrote: > > >>> > > Hm... OK, I think that makes sense. > > > > It seems like I can't filter out those tombstone records; is that > > >>> expected > > as well? If I throw in a .filter operation before my foreach, its > > Predicate is not invoked, and the foreach's ForeachAction is invoked > > >>> with a > > null value still. > > > > Mathieu > > > > > > On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax < > > >> matth...@confluent.io> > > wrote: > > > > > Hi Mathieu, > > > > > > join semantics are tricky. We are still working on a better > > > documentation for it... > > > > > > For the current state and your question: > > > > > > Each time a record is processed, it looks up the other KTable to > see > > >> if > > > there is a matching record. If non is found, the join result is > empty > > > and a tombstone record with is sent downstream. This > > >>> happens, > > > to delete any (possible existing) previous join result for this key > > >> -- > > > keep in mind, that the result is a KTable containing the current > > >> state > > > of the join. > > > > > > This happens both ways, thus, if your first records of each stream > do > > > not match on the key, both result in a message to delete > > > possible existing join-tuples in the result KTable. > > > > > > Does this make sense to you? > > > > > > -Matthias > > > > > > On 07/20/2016 04:09 PM, Mathieu Fenniak wrote: > > >> Hello Kafka users, > > >> > > >> I'm seeing some unexpected results when using Kafka Streams, and I > > >>> was > > >> hoping someone could explain them to me. I have two streams, > which > > I've > > >> converted KStream->KTable, and then I am joining them together > > >> with a > > >> "join" (not an outer join, not a full join). With the resulting > > >>> KTable > > >> from the join, I am performing a foreach. > > >> > > >> When I startup my Streams application, my foreach receives two > > >>> records > > > with > > >> valid keys but null values *before* my ValueJoiner ever gets > > >>> executed. > > > Why > > >> would that be? > > >> > > >> Code excerpt; please excuse the use of Kotlin here: > > >> > > >> val builder = KStreamBuilder() > > >> > > >> val approvalStatus = builder.table( > > >> Serdes.String(), > > >> > > >> JsonSerde(TimesheetApprovalStatusChangedMessage::class.java), > > >> "TimesheetApprovalStatusChanged" > > >> ) > > >> > > >> val timesheetLastApprovalAction = builder.table( > > >> Serdes.String(), > > >> JsonSerde(Map::class.java), > > >> "TimesheetApprovalActionPerformed" > > >> ) > > >> > > >> val timesheetStatus = > > >>> approvalStatus.join(timesheetLastApprovalAction, > > { > > >> approvalStatus,
Re: Kafka Streams: KTable join + filter + foreach
I'm using the 0.10.0.0 release. Matthias's suggestion of using .toStream().filter(...).foreach(...) does prevents the nulls from reaching the foreach. But .filter(...).foreach(...) does not; the filter's predicate is not even executed before the ForeachAction receives the null records. On Wed, Jul 20, 2016 at 12:30 PM, Matthias J. Saxwrote: > Try to do a > > .toStream().filter(...).foreach(...) > > > -Matthias > > > On 07/20/2016 08:11 PM, Guozhang Wang wrote: > > Are you using the 0.10.0.0 release or from trunk? > > > > On Wed, Jul 20, 2016 at 10:58 AM, Mathieu Fenniak < > > mathieu.fenn...@replicon.com> wrote: > > > >> Hi Guozhang, > >> > >> Yes, I tried to apply the filter on the KTable that came from join, and > >> then the foreach on the KTable that came from filter. I was still > getting > >> the nulls through to my foreach. > >> > >> It is easy to workaround, but, the behaviour was especially surprising > when > >> the filter didn't prevent it. > >> > >> Mathieu > >> > >> > >> On Wed, Jul 20, 2016 at 11:57 AM, Guozhang Wang > >> wrote: > >> > >>> Hi Mathieu, > >>> > >>> As Matthias said, we are working on improving the current join > semantics: > >>> > >>> > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287 > >>> > >>> and will keep you updated. > >>> > >>> > >>> As for KTable.filter(), I think it can actually achieve want you want: > >> not > >>> forwarding nulls to the downstream operators; have you tried it out but > >>> find it is not working? > >>> > >>> > >>> Guozhang > >>> > >>> > >>> > >>> On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak < > >>> mathieu.fenn...@replicon.com> wrote: > >>> > Hm... OK, I think that makes sense. > > It seems like I can't filter out those tombstone records; is that > >>> expected > as well? If I throw in a .filter operation before my foreach, its > Predicate is not invoked, and the foreach's ForeachAction is invoked > >>> with a > null value still. > > Mathieu > > > On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax < > >> matth...@confluent.io> > wrote: > > > Hi Mathieu, > > > > join semantics are tricky. We are still working on a better > > documentation for it... > > > > For the current state and your question: > > > > Each time a record is processed, it looks up the other KTable to see > >> if > > there is a matching record. If non is found, the join result is empty > > and a tombstone record with is sent downstream. This > >>> happens, > > to delete any (possible existing) previous join result for this key > >> -- > > keep in mind, that the result is a KTable containing the current > >> state > > of the join. > > > > This happens both ways, thus, if your first records of each stream do > > not match on the key, both result in a message to delete > > possible existing join-tuples in the result KTable. > > > > Does this make sense to you? > > > > -Matthias > > > > On 07/20/2016 04:09 PM, Mathieu Fenniak wrote: > >> Hello Kafka users, > >> > >> I'm seeing some unexpected results when using Kafka Streams, and I > >>> was > >> hoping someone could explain them to me. I have two streams, which > I've > >> converted KStream->KTable, and then I am joining them together > >> with a > >> "join" (not an outer join, not a full join). With the resulting > >>> KTable > >> from the join, I am performing a foreach. > >> > >> When I startup my Streams application, my foreach receives two > >>> records > > with > >> valid keys but null values *before* my ValueJoiner ever gets > >>> executed. > > Why > >> would that be? > >> > >> Code excerpt; please excuse the use of Kotlin here: > >> > >> val builder = KStreamBuilder() > >> > >> val approvalStatus = builder.table( > >> Serdes.String(), > >> > >> JsonSerde(TimesheetApprovalStatusChangedMessage::class.java), > >> "TimesheetApprovalStatusChanged" > >> ) > >> > >> val timesheetLastApprovalAction = builder.table( > >> Serdes.String(), > >> JsonSerde(Map::class.java), > >> "TimesheetApprovalActionPerformed" > >> ) > >> > >> val timesheetStatus = > >>> approvalStatus.join(timesheetLastApprovalAction, > { > >> approvalStatus, lastApprovalAction -> > >> println("EXECUTING ValueJoiner") > >> computeTimesheetStatus(approvalStatus.approvalStatus!!, > > lastApprovalAction > >> as Map ) > >> }) > >> > >> timesheetStatus.foreach({ timesheetKey, timesheetStatus -> > >> println("EXECUTING ForeachAction: $timesheetKey, status: > >> $timesheetStatus") > >> if (timesheetStatus == null) { > >> println("SKIPPING NULL I DON'T UNDERSTAND") > >> }
Re: Kafka Streams: KTable join + filter + foreach
Try to do a .toStream().filter(...).foreach(...) -Matthias On 07/20/2016 08:11 PM, Guozhang Wang wrote: > Are you using the 0.10.0.0 release or from trunk? > > On Wed, Jul 20, 2016 at 10:58 AM, Mathieu Fenniak < > mathieu.fenn...@replicon.com> wrote: > >> Hi Guozhang, >> >> Yes, I tried to apply the filter on the KTable that came from join, and >> then the foreach on the KTable that came from filter. I was still getting >> the nulls through to my foreach. >> >> It is easy to workaround, but, the behaviour was especially surprising when >> the filter didn't prevent it. >> >> Mathieu >> >> >> On Wed, Jul 20, 2016 at 11:57 AM, Guozhang Wang>> wrote: >> >>> Hi Mathieu, >>> >>> As Matthias said, we are working on improving the current join semantics: >>> >>> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287 >>> >>> and will keep you updated. >>> >>> >>> As for KTable.filter(), I think it can actually achieve want you want: >> not >>> forwarding nulls to the downstream operators; have you tried it out but >>> find it is not working? >>> >>> >>> Guozhang >>> >>> >>> >>> On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak < >>> mathieu.fenn...@replicon.com> wrote: >>> Hm... OK, I think that makes sense. It seems like I can't filter out those tombstone records; is that >>> expected as well? If I throw in a .filter operation before my foreach, its Predicate is not invoked, and the foreach's ForeachAction is invoked >>> with a null value still. Mathieu On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax < >> matth...@confluent.io> wrote: > Hi Mathieu, > > join semantics are tricky. We are still working on a better > documentation for it... > > For the current state and your question: > > Each time a record is processed, it looks up the other KTable to see >> if > there is a matching record. If non is found, the join result is empty > and a tombstone record with is sent downstream. This >>> happens, > to delete any (possible existing) previous join result for this key >> -- > keep in mind, that the result is a KTable containing the current >> state > of the join. > > This happens both ways, thus, if your first records of each stream do > not match on the key, both result in a message to delete > possible existing join-tuples in the result KTable. > > Does this make sense to you? > > -Matthias > > On 07/20/2016 04:09 PM, Mathieu Fenniak wrote: >> Hello Kafka users, >> >> I'm seeing some unexpected results when using Kafka Streams, and I >>> was >> hoping someone could explain them to me. I have two streams, which I've >> converted KStream->KTable, and then I am joining them together >> with a >> "join" (not an outer join, not a full join). With the resulting >>> KTable >> from the join, I am performing a foreach. >> >> When I startup my Streams application, my foreach receives two >>> records > with >> valid keys but null values *before* my ValueJoiner ever gets >>> executed. > Why >> would that be? >> >> Code excerpt; please excuse the use of Kotlin here: >> >> val builder = KStreamBuilder() >> >> val approvalStatus = builder.table( >> Serdes.String(), >> >> JsonSerde(TimesheetApprovalStatusChangedMessage::class.java), >> "TimesheetApprovalStatusChanged" >> ) >> >> val timesheetLastApprovalAction = builder.table( >> Serdes.String(), >> JsonSerde(Map::class.java), >> "TimesheetApprovalActionPerformed" >> ) >> >> val timesheetStatus = >>> approvalStatus.join(timesheetLastApprovalAction, { >> approvalStatus, lastApprovalAction -> >> println("EXECUTING ValueJoiner") >> computeTimesheetStatus(approvalStatus.approvalStatus!!, > lastApprovalAction >> as Map ) >> }) >> >> timesheetStatus.foreach({ timesheetKey, timesheetStatus -> >> println("EXECUTING ForeachAction: $timesheetKey, status: >> $timesheetStatus") >> if (timesheetStatus == null) { >> println("SKIPPING NULL I DON'T UNDERSTAND") >> } >> }) >> >> >> Resulting console output: >> >> EXECUTING ForeachAction: >>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, >> status: null >> SKIPPING NULL I DON'T UNDERSTAND >> EXECUTING ForeachAction: >>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, >> status: null >> SKIPPING NULL I DON'T UNDERSTAND >> EXECUTING ValueJoiner >> EXECUTING ForeachAction: >>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, >> status: urn:replicon:timesheet-status:submitting >> >> >> Any explanation on why the foreach would be executing for data that > hasn't >> been generated by my join? >> >>
Re: Kafka Streams: KTable join + filter + foreach
Are you using the 0.10.0.0 release or from trunk? On Wed, Jul 20, 2016 at 10:58 AM, Mathieu Fenniak < mathieu.fenn...@replicon.com> wrote: > Hi Guozhang, > > Yes, I tried to apply the filter on the KTable that came from join, and > then the foreach on the KTable that came from filter. I was still getting > the nulls through to my foreach. > > It is easy to workaround, but, the behaviour was especially surprising when > the filter didn't prevent it. > > Mathieu > > > On Wed, Jul 20, 2016 at 11:57 AM, Guozhang Wang> wrote: > > > Hi Mathieu, > > > > As Matthias said, we are working on improving the current join semantics: > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287 > > > > and will keep you updated. > > > > > > As for KTable.filter(), I think it can actually achieve want you want: > not > > forwarding nulls to the downstream operators; have you tried it out but > > find it is not working? > > > > > > Guozhang > > > > > > > > On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak < > > mathieu.fenn...@replicon.com> wrote: > > > > > Hm... OK, I think that makes sense. > > > > > > It seems like I can't filter out those tombstone records; is that > > expected > > > as well? If I throw in a .filter operation before my foreach, its > > > Predicate is not invoked, and the foreach's ForeachAction is invoked > > with a > > > null value still. > > > > > > Mathieu > > > > > > > > > On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax < > matth...@confluent.io> > > > wrote: > > > > > > > Hi Mathieu, > > > > > > > > join semantics are tricky. We are still working on a better > > > > documentation for it... > > > > > > > > For the current state and your question: > > > > > > > > Each time a record is processed, it looks up the other KTable to see > if > > > > there is a matching record. If non is found, the join result is empty > > > > and a tombstone record with is sent downstream. This > > happens, > > > > to delete any (possible existing) previous join result for this key > -- > > > > keep in mind, that the result is a KTable containing the current > state > > > > of the join. > > > > > > > > This happens both ways, thus, if your first records of each stream do > > > > not match on the key, both result in a message to delete > > > > possible existing join-tuples in the result KTable. > > > > > > > > Does this make sense to you? > > > > > > > > -Matthias > > > > > > > > On 07/20/2016 04:09 PM, Mathieu Fenniak wrote: > > > > > Hello Kafka users, > > > > > > > > > > I'm seeing some unexpected results when using Kafka Streams, and I > > was > > > > > hoping someone could explain them to me. I have two streams, which > > > I've > > > > > converted KStream->KTable, and then I am joining them together > with a > > > > > "join" (not an outer join, not a full join). With the resulting > > KTable > > > > > from the join, I am performing a foreach. > > > > > > > > > > When I startup my Streams application, my foreach receives two > > records > > > > with > > > > > valid keys but null values *before* my ValueJoiner ever gets > > executed. > > > > Why > > > > > would that be? > > > > > > > > > > Code excerpt; please excuse the use of Kotlin here: > > > > > > > > > > val builder = KStreamBuilder() > > > > > > > > > > val approvalStatus = builder.table( > > > > > Serdes.String(), > > > > > > JsonSerde(TimesheetApprovalStatusChangedMessage::class.java), > > > > > "TimesheetApprovalStatusChanged" > > > > > ) > > > > > > > > > > val timesheetLastApprovalAction = builder.table( > > > > > Serdes.String(), > > > > > JsonSerde(Map::class.java), > > > > > "TimesheetApprovalActionPerformed" > > > > > ) > > > > > > > > > > val timesheetStatus = > > approvalStatus.join(timesheetLastApprovalAction, > > > { > > > > > approvalStatus, lastApprovalAction -> > > > > > println("EXECUTING ValueJoiner") > > > > > computeTimesheetStatus(approvalStatus.approvalStatus!!, > > > > lastApprovalAction > > > > > as Map ) > > > > > }) > > > > > > > > > > timesheetStatus.foreach({ timesheetKey, timesheetStatus -> > > > > > println("EXECUTING ForeachAction: $timesheetKey, status: > > > > > $timesheetStatus") > > > > > if (timesheetStatus == null) { > > > > > println("SKIPPING NULL I DON'T UNDERSTAND") > > > > > } > > > > > }) > > > > > > > > > > > > > > > Resulting console output: > > > > > > > > > > EXECUTING ForeachAction: > > mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > > > > > status: null > > > > > SKIPPING NULL I DON'T UNDERSTAND > > > > > EXECUTING ForeachAction: > > mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > > > > > status: null > > > > > SKIPPING NULL I DON'T UNDERSTAND > > > > > EXECUTING ValueJoiner > > > > > EXECUTING ForeachAction: > > mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > > > > > status: urn:replicon:timesheet-status:submitting > > > > > > > > > > > > > > > Any explanation on why the
Re: Kafka Streams: KTable join + filter + foreach
Hi Guozhang, Yes, I tried to apply the filter on the KTable that came from join, and then the foreach on the KTable that came from filter. I was still getting the nulls through to my foreach. It is easy to workaround, but, the behaviour was especially surprising when the filter didn't prevent it. Mathieu On Wed, Jul 20, 2016 at 11:57 AM, Guozhang Wangwrote: > Hi Mathieu, > > As Matthias said, we are working on improving the current join semantics: > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287 > > and will keep you updated. > > > As for KTable.filter(), I think it can actually achieve want you want: not > forwarding nulls to the downstream operators; have you tried it out but > find it is not working? > > > Guozhang > > > > On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak < > mathieu.fenn...@replicon.com> wrote: > > > Hm... OK, I think that makes sense. > > > > It seems like I can't filter out those tombstone records; is that > expected > > as well? If I throw in a .filter operation before my foreach, its > > Predicate is not invoked, and the foreach's ForeachAction is invoked > with a > > null value still. > > > > Mathieu > > > > > > On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax > > wrote: > > > > > Hi Mathieu, > > > > > > join semantics are tricky. We are still working on a better > > > documentation for it... > > > > > > For the current state and your question: > > > > > > Each time a record is processed, it looks up the other KTable to see if > > > there is a matching record. If non is found, the join result is empty > > > and a tombstone record with is sent downstream. This > happens, > > > to delete any (possible existing) previous join result for this key -- > > > keep in mind, that the result is a KTable containing the current state > > > of the join. > > > > > > This happens both ways, thus, if your first records of each stream do > > > not match on the key, both result in a message to delete > > > possible existing join-tuples in the result KTable. > > > > > > Does this make sense to you? > > > > > > -Matthias > > > > > > On 07/20/2016 04:09 PM, Mathieu Fenniak wrote: > > > > Hello Kafka users, > > > > > > > > I'm seeing some unexpected results when using Kafka Streams, and I > was > > > > hoping someone could explain them to me. I have two streams, which > > I've > > > > converted KStream->KTable, and then I am joining them together with a > > > > "join" (not an outer join, not a full join). With the resulting > KTable > > > > from the join, I am performing a foreach. > > > > > > > > When I startup my Streams application, my foreach receives two > records > > > with > > > > valid keys but null values *before* my ValueJoiner ever gets > executed. > > > Why > > > > would that be? > > > > > > > > Code excerpt; please excuse the use of Kotlin here: > > > > > > > > val builder = KStreamBuilder() > > > > > > > > val approvalStatus = builder.table( > > > > Serdes.String(), > > > > JsonSerde(TimesheetApprovalStatusChangedMessage::class.java), > > > > "TimesheetApprovalStatusChanged" > > > > ) > > > > > > > > val timesheetLastApprovalAction = builder.table( > > > > Serdes.String(), > > > > JsonSerde(Map::class.java), > > > > "TimesheetApprovalActionPerformed" > > > > ) > > > > > > > > val timesheetStatus = > approvalStatus.join(timesheetLastApprovalAction, > > { > > > > approvalStatus, lastApprovalAction -> > > > > println("EXECUTING ValueJoiner") > > > > computeTimesheetStatus(approvalStatus.approvalStatus!!, > > > lastApprovalAction > > > > as Map ) > > > > }) > > > > > > > > timesheetStatus.foreach({ timesheetKey, timesheetStatus -> > > > > println("EXECUTING ForeachAction: $timesheetKey, status: > > > > $timesheetStatus") > > > > if (timesheetStatus == null) { > > > > println("SKIPPING NULL I DON'T UNDERSTAND") > > > > } > > > > }) > > > > > > > > > > > > Resulting console output: > > > > > > > > EXECUTING ForeachAction: > mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > > > > status: null > > > > SKIPPING NULL I DON'T UNDERSTAND > > > > EXECUTING ForeachAction: > mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > > > > status: null > > > > SKIPPING NULL I DON'T UNDERSTAND > > > > EXECUTING ValueJoiner > > > > EXECUTING ForeachAction: > mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > > > > status: urn:replicon:timesheet-status:submitting > > > > > > > > > > > > Any explanation on why the foreach would be executing for data that > > > hasn't > > > > been generated by my join? > > > > > > > > Thanks, > > > > > > > > Mathieu > > > > > > > > > > > > > > > > -- > -- Guozhang >
Re: Kafka Streams: KTable join + filter + foreach
Hi Mathieu, As Matthias said, we are working on improving the current join semantics: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287 and will keep you updated. As for KTable.filter(), I think it can actually achieve want you want: not forwarding nulls to the downstream operators; have you tried it out but find it is not working? Guozhang On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak < mathieu.fenn...@replicon.com> wrote: > Hm... OK, I think that makes sense. > > It seems like I can't filter out those tombstone records; is that expected > as well? If I throw in a .filter operation before my foreach, its > Predicate is not invoked, and the foreach's ForeachAction is invoked with a > null value still. > > Mathieu > > > On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax> wrote: > > > Hi Mathieu, > > > > join semantics are tricky. We are still working on a better > > documentation for it... > > > > For the current state and your question: > > > > Each time a record is processed, it looks up the other KTable to see if > > there is a matching record. If non is found, the join result is empty > > and a tombstone record with is sent downstream. This happens, > > to delete any (possible existing) previous join result for this key -- > > keep in mind, that the result is a KTable containing the current state > > of the join. > > > > This happens both ways, thus, if your first records of each stream do > > not match on the key, both result in a message to delete > > possible existing join-tuples in the result KTable. > > > > Does this make sense to you? > > > > -Matthias > > > > On 07/20/2016 04:09 PM, Mathieu Fenniak wrote: > > > Hello Kafka users, > > > > > > I'm seeing some unexpected results when using Kafka Streams, and I was > > > hoping someone could explain them to me. I have two streams, which > I've > > > converted KStream->KTable, and then I am joining them together with a > > > "join" (not an outer join, not a full join). With the resulting KTable > > > from the join, I am performing a foreach. > > > > > > When I startup my Streams application, my foreach receives two records > > with > > > valid keys but null values *before* my ValueJoiner ever gets executed. > > Why > > > would that be? > > > > > > Code excerpt; please excuse the use of Kotlin here: > > > > > > val builder = KStreamBuilder() > > > > > > val approvalStatus = builder.table( > > > Serdes.String(), > > > JsonSerde(TimesheetApprovalStatusChangedMessage::class.java), > > > "TimesheetApprovalStatusChanged" > > > ) > > > > > > val timesheetLastApprovalAction = builder.table( > > > Serdes.String(), > > > JsonSerde(Map::class.java), > > > "TimesheetApprovalActionPerformed" > > > ) > > > > > > val timesheetStatus = approvalStatus.join(timesheetLastApprovalAction, > { > > > approvalStatus, lastApprovalAction -> > > > println("EXECUTING ValueJoiner") > > > computeTimesheetStatus(approvalStatus.approvalStatus!!, > > lastApprovalAction > > > as Map ) > > > }) > > > > > > timesheetStatus.foreach({ timesheetKey, timesheetStatus -> > > > println("EXECUTING ForeachAction: $timesheetKey, status: > > > $timesheetStatus") > > > if (timesheetStatus == null) { > > > println("SKIPPING NULL I DON'T UNDERSTAND") > > > } > > > }) > > > > > > > > > Resulting console output: > > > > > > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > > > status: null > > > SKIPPING NULL I DON'T UNDERSTAND > > > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > > > status: null > > > SKIPPING NULL I DON'T UNDERSTAND > > > EXECUTING ValueJoiner > > > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > > > status: urn:replicon:timesheet-status:submitting > > > > > > > > > Any explanation on why the foreach would be executing for data that > > hasn't > > > been generated by my join? > > > > > > Thanks, > > > > > > Mathieu > > > > > > > > -- -- Guozhang
Re: Kafka Streams: KTable join + filter + foreach
Hm... OK, I think that makes sense. It seems like I can't filter out those tombstone records; is that expected as well? If I throw in a .filter operation before my foreach, its Predicate is not invoked, and the foreach's ForeachAction is invoked with a null value still. Mathieu On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Saxwrote: > Hi Mathieu, > > join semantics are tricky. We are still working on a better > documentation for it... > > For the current state and your question: > > Each time a record is processed, it looks up the other KTable to see if > there is a matching record. If non is found, the join result is empty > and a tombstone record with is sent downstream. This happens, > to delete any (possible existing) previous join result for this key -- > keep in mind, that the result is a KTable containing the current state > of the join. > > This happens both ways, thus, if your first records of each stream do > not match on the key, both result in a message to delete > possible existing join-tuples in the result KTable. > > Does this make sense to you? > > -Matthias > > On 07/20/2016 04:09 PM, Mathieu Fenniak wrote: > > Hello Kafka users, > > > > I'm seeing some unexpected results when using Kafka Streams, and I was > > hoping someone could explain them to me. I have two streams, which I've > > converted KStream->KTable, and then I am joining them together with a > > "join" (not an outer join, not a full join). With the resulting KTable > > from the join, I am performing a foreach. > > > > When I startup my Streams application, my foreach receives two records > with > > valid keys but null values *before* my ValueJoiner ever gets executed. > Why > > would that be? > > > > Code excerpt; please excuse the use of Kotlin here: > > > > val builder = KStreamBuilder() > > > > val approvalStatus = builder.table( > > Serdes.String(), > > JsonSerde(TimesheetApprovalStatusChangedMessage::class.java), > > "TimesheetApprovalStatusChanged" > > ) > > > > val timesheetLastApprovalAction = builder.table( > > Serdes.String(), > > JsonSerde(Map::class.java), > > "TimesheetApprovalActionPerformed" > > ) > > > > val timesheetStatus = approvalStatus.join(timesheetLastApprovalAction, { > > approvalStatus, lastApprovalAction -> > > println("EXECUTING ValueJoiner") > > computeTimesheetStatus(approvalStatus.approvalStatus!!, > lastApprovalAction > > as Map ) > > }) > > > > timesheetStatus.foreach({ timesheetKey, timesheetStatus -> > > println("EXECUTING ForeachAction: $timesheetKey, status: > > $timesheetStatus") > > if (timesheetStatus == null) { > > println("SKIPPING NULL I DON'T UNDERSTAND") > > } > > }) > > > > > > Resulting console output: > > > > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > > status: null > > SKIPPING NULL I DON'T UNDERSTAND > > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > > status: null > > SKIPPING NULL I DON'T UNDERSTAND > > EXECUTING ValueJoiner > > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > > status: urn:replicon:timesheet-status:submitting > > > > > > Any explanation on why the foreach would be executing for data that > hasn't > > been generated by my join? > > > > Thanks, > > > > Mathieu > > > >
Re: Kafka Streams: KTable join + filter + foreach
Hi Mathieu, join semantics are tricky. We are still working on a better documentation for it... For the current state and your question: Each time a record is processed, it looks up the other KTable to see if there is a matching record. If non is found, the join result is empty and a tombstone record with is sent downstream. This happens, to delete any (possible existing) previous join result for this key -- keep in mind, that the result is a KTable containing the current state of the join. This happens both ways, thus, if your first records of each stream do not match on the key, both result in a message to delete possible existing join-tuples in the result KTable. Does this make sense to you? -Matthias On 07/20/2016 04:09 PM, Mathieu Fenniak wrote: > Hello Kafka users, > > I'm seeing some unexpected results when using Kafka Streams, and I was > hoping someone could explain them to me. I have two streams, which I've > converted KStream->KTable, and then I am joining them together with a > "join" (not an outer join, not a full join). With the resulting KTable > from the join, I am performing a foreach. > > When I startup my Streams application, my foreach receives two records with > valid keys but null values *before* my ValueJoiner ever gets executed. Why > would that be? > > Code excerpt; please excuse the use of Kotlin here: > > val builder = KStreamBuilder() > > val approvalStatus = builder.table( > Serdes.String(), > JsonSerde(TimesheetApprovalStatusChangedMessage::class.java), > "TimesheetApprovalStatusChanged" > ) > > val timesheetLastApprovalAction = builder.table( > Serdes.String(), > JsonSerde(Map::class.java), > "TimesheetApprovalActionPerformed" > ) > > val timesheetStatus = approvalStatus.join(timesheetLastApprovalAction, { > approvalStatus, lastApprovalAction -> > println("EXECUTING ValueJoiner") > computeTimesheetStatus(approvalStatus.approvalStatus!!, lastApprovalAction > as Map) > }) > > timesheetStatus.foreach({ timesheetKey, timesheetStatus -> > println("EXECUTING ForeachAction: $timesheetKey, status: > $timesheetStatus") > if (timesheetStatus == null) { > println("SKIPPING NULL I DON'T UNDERSTAND") > } > }) > > > Resulting console output: > > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > status: null > SKIPPING NULL I DON'T UNDERSTAND > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > status: null > SKIPPING NULL I DON'T UNDERSTAND > EXECUTING ValueJoiner > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > status: urn:replicon:timesheet-status:submitting > > > Any explanation on why the foreach would be executing for data that hasn't > been generated by my join? > > Thanks, > > Mathieu > signature.asc Description: OpenPGP digital signature