[ https://issues.apache.org/jira/browse/KAFKA-10494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Andy Coates updated KAFKA-10494: -------------------------------- Description: Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is unnecessarily calling `enableSendingOldValues` on the parent, even when the processor itself is materialized. This can force the parent table to be materialized unnecessarily. For example: {{StreamsBuilder builder = new StreamsBuilder();}}{{builder}} .table("t1", Consumed.of(...)) .filter(predicate, Materialized.as("t2")) .<downStreamOps> If `downStreamOps` result in a call to `enableSendingOldValues` on the table returned by the `filter` call, i.e. `t2`, then it will result in `t1` being materialized unnecessarily. This ticket was raised off the back of [comments in a PR|#discussion_r490152263]] while working on KAFKA-10077. A good test that highlights this would be to add this to `KTableFilterTest`: {{@Test}} {{public void shouldEnableSendOldValuesIfSourceTableNotMaterializedButFinalTableIsEvenIfNotForcedToMaterialize() {}} {{ final StreamsBuilder builder = new StreamsBuilder();}} {{ final String topic1 = "topic1";}} {{ final KTableImpl<String, Integer, Integer> table1 =}} {{ (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);}} {{ final KTableImpl<String, Integer, Integer> table2 =}} {{ (KTableImpl<String, Integer, Integer>) table1.filter(predicate, Materialized.as("store2"));}} {{ table2.enableSendingOldValues(false);}} {{ doTestSendingOldValue(builder, table1, table2, topic1);}} {{}}} Though this problem is not restricted to the filter call. Other processor suppliers suffer from the same issue. In addition, once [https://github.com/apache/kafka/pull/9156] is merged, if you call {{enableSendingOldValues}} without forcing materialization on a table that is itself materialized, but who's upstream is not. In such a situation, the table will _not_ enable sending old values, but should. was: Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is unnecessarily calling `enableSendingOldValues` on the parent, even when the processor itself is materialized. This can force the parent table to be materialized unnecessarily. For example: {{StreamsBuilder builder = new StreamsBuilder();}}{{builder}} \{{ .table("t1", Consumed.of(...))}} \{{ .filter(predicate, Materialized.as("t2"))}} \{{ .<downStreamOps>}} If `downStreamOps` result in a call to `enableSendingOldValues` on the table returned by the `filter` call, i.e. `t2`, then it will result in `t1` being materialized unnecessarily. This ticket was raised off the back of [comments in a PR|#discussion_r490152263]] while working on KAFKA-10077. A good test that highlights this would be to add this to `KTableFilterTest`: {{@Test}} {{public void shouldEnableSendOldValuesIfSourceTableNotMaterializedButFinalTableIsEvenIfNotForcedToMaterialize() {}} {{ final StreamsBuilder builder = new StreamsBuilder();}} {{ final String topic1 = "topic1";}} {{ final KTableImpl<String, Integer, Integer> table1 =}} {{ (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);}} {{ final KTableImpl<String, Integer, Integer> table2 =}} {{ (KTableImpl<String, Integer, Integer>) table1.filter(predicate, Materialized.as("store2"));}} {{ table2.enableSendingOldValues(false);}} {{ doTestSendingOldValue(builder, table1, table2, topic1);}} {{}}} Though this problem is not restricted to the filter call. Other processor suppliers suffer from the same issue. In addition, once [https://github.com/apache/kafka/pull/9156] is merged, if you call {{enableSendingOldValues}} without forcing materialization on a table that is itself materialized, but who's upstream is not. In such a situation, the table will _not_ enable sending old values, but should. > Streams: enableSendingOldValues should not call parent if node is itself > materialized > ------------------------------------------------------------------------------------- > > Key: KAFKA-10494 > URL: https://issues.apache.org/jira/browse/KAFKA-10494 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Andy Coates > Priority: Major > > Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is > unnecessarily calling `enableSendingOldValues` on the parent, even when the > processor itself is materialized. This can force the parent table to be > materialized unnecessarily. > > For example: > {{StreamsBuilder builder = new StreamsBuilder();}}{{builder}} > .table("t1", Consumed.of(...)) > .filter(predicate, Materialized.as("t2")) > .<downStreamOps> > If `downStreamOps` result in a call to `enableSendingOldValues` on the table > returned by the `filter` call, i.e. `t2`, then it will result in `t1` being > materialized unnecessarily. > This ticket was raised off the back of [comments in a > PR|#discussion_r490152263]] while working on KAFKA-10077. > A good test that highlights this would be to add this to `KTableFilterTest`: > {{@Test}} > {{public void > shouldEnableSendOldValuesIfSourceTableNotMaterializedButFinalTableIsEvenIfNotForcedToMaterialize() > {}} > {{ final StreamsBuilder builder = new StreamsBuilder();}} > {{ final String topic1 = "topic1";}} > {{ final KTableImpl<String, Integer, Integer> table1 =}} > {{ (KTableImpl<String, Integer, Integer>) builder.table(topic1, > consumed);}} > {{ final KTableImpl<String, Integer, Integer> table2 =}} > {{ (KTableImpl<String, Integer, Integer>) table1.filter(predicate, > Materialized.as("store2"));}} > {{ table2.enableSendingOldValues(false);}} > {{ doTestSendingOldValue(builder, table1, table2, topic1);}} > {{}}} > Though this problem is not restricted to the filter call. Other processor > suppliers suffer from the same issue. > In addition, once [https://github.com/apache/kafka/pull/9156] is merged, if > you call {{enableSendingOldValues}} without forcing materialization on a > table that is itself materialized, but who's upstream is not. In such a > situation, the table will _not_ enable sending old values, but should. > -- This message was sent by Atlassian Jira (v8.3.4#803005)