[ 
https://issues.apache.org/jira/browse/KAFKA-10494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-10494:
--------------------------------
    Description: 
Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
unnecessarily calling `enableSendingOldValues` on the parent, even when the 
processor itself is materialized. This can force the parent table to be 
materialized unnecessarily.

For example:
{code:java}
StreamsBuilder builder = new StreamsBuilder();builder
     .table("t1", Consumed.of(...))
     .filter(predicate, Materialized.as("t2"))
     .<downStreamOps>
{code}
If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
materialized unnecessarily.

This ticket was raised off the back of [comments in a 
PR|#discussion_r490152263]] while working on KAFKA-10077.

A good test that highlights this would be to add this to `KTableFilterTest`:
{code:java}
@Test
 public void 
shouldEnableSendOldValuesIfSourceTableNotMaterializedButFinalTableIsEvenIfNotForcedToMaterialize()
 {
   final StreamsBuilder builder = new StreamsBuilder();
   final String topic1 = "topic1";
  final KTableImpl<String, Integer, Integer> table1 =
      (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
   final KTableImpl<String, Integer, Integer> table2 =
      (KTableImpl<String, Integer, Integer>) table1.filter(predicate, 
Materialized.as("store2"));
  table2.enableSendingOldValues(false);
  doTestSendingOldValue(builder, table1, table2, topic1);
 }
{code}
Though this problem is not restricted to the filter call. Other processor 
suppliers suffer from the same issue.

In addition, once [https://github.com/apache/kafka/pull/9156] is merged,  if 
you call {{enableSendingOldValues}} without forcing materialization on a table 
that is itself materialized, but who's upstream is not. In such a situation, 
the table will _not_ enable sending old values, but should.

 

  was:
Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
unnecessarily calling `enableSendingOldValues` on the parent, even when the 
processor itself is materialized. This can force the parent table to be 
materialized unnecessarily.

 

For example:

{{StreamsBuilder builder = new StreamsBuilder();}}{{builder}}
    .table("t1", Consumed.of(...))
    .filter(predicate, Materialized.as("t2"))
    .<downStreamOps>

If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
materialized unnecessarily.

This ticket was raised off the back of [comments in a 
PR|#discussion_r490152263]] while working on KAFKA-10077.

A good test that highlights this would be to add this to `KTableFilterTest`:

{{@Test}}
 {{public void 
shouldEnableSendOldValuesIfSourceTableNotMaterializedButFinalTableIsEvenIfNotForcedToMaterialize()
 {}}
 {{  final StreamsBuilder builder = new StreamsBuilder();}}
 {{  final String topic1 = "topic1";}}

{{  final KTableImpl<String, Integer, Integer> table1 =}}
 {{     (KTableImpl<String, Integer, Integer>) builder.table(topic1, 
consumed);}}
 {{  final KTableImpl<String, Integer, Integer> table2 =}}
 {{     (KTableImpl<String, Integer, Integer>) table1.filter(predicate, 
Materialized.as("store2"));}}

{{  table2.enableSendingOldValues(false);}}

{{  doTestSendingOldValue(builder, table1, table2, topic1);}}
 {{}}}

Though this problem is not restricted to the filter call. Other processor 
suppliers suffer from the same issue.

In addition, once [https://github.com/apache/kafka/pull/9156] is merged,  if 
you call {{enableSendingOldValues}} without forcing materialization on a table 
that is itself materialized, but who's upstream is not. In such a situation, 
the table will _not_ enable sending old values, but should.

 


> Streams: enableSendingOldValues should not call parent if node is itself 
> materialized
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10494
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10494
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Andy Coates
>            Priority: Major
>
> Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
> unnecessarily calling `enableSendingOldValues` on the parent, even when the 
> processor itself is materialized. This can force the parent table to be 
> materialized unnecessarily.
> For example:
> {code:java}
> StreamsBuilder builder = new StreamsBuilder();builder
>      .table("t1", Consumed.of(...))
>      .filter(predicate, Materialized.as("t2"))
>      .<downStreamOps>
> {code}
> If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
> returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
> materialized unnecessarily.
> This ticket was raised off the back of [comments in a 
> PR|#discussion_r490152263]] while working on KAFKA-10077.
> A good test that highlights this would be to add this to `KTableFilterTest`:
> {code:java}
> @Test
>  public void 
> shouldEnableSendOldValuesIfSourceTableNotMaterializedButFinalTableIsEvenIfNotForcedToMaterialize()
>  {
>    final StreamsBuilder builder = new StreamsBuilder();
>    final String topic1 = "topic1";
>   final KTableImpl<String, Integer, Integer> table1 =
>       (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
>    final KTableImpl<String, Integer, Integer> table2 =
>       (KTableImpl<String, Integer, Integer>) table1.filter(predicate, 
> Materialized.as("store2"));
>   table2.enableSendingOldValues(false);
>   doTestSendingOldValue(builder, table1, table2, topic1);
>  }
> {code}
> Though this problem is not restricted to the filter call. Other processor 
> suppliers suffer from the same issue.
> In addition, once [https://github.com/apache/kafka/pull/9156] is merged,  if 
> you call {{enableSendingOldValues}} without forcing materialization on a 
> table that is itself materialized, but who's upstream is not. In such a 
> situation, the table will _not_ enable sending old values, but should.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to