mjsax commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r485846736



##########
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##########
@@ -17,11 +17,12 @@
 
 package org.apache.kafka.server.authorizer;
 
-import java.net.InetAddress;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 
+import java.net.InetAddress;

Review comment:
       Unrelated change: can we revert this?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       Can we flip the logic of the boolean flag ? -- It makes a knob in my 
mind to say, `onlyIfMaterailized=false` implies that we need to (ie, enforce 
to) materialize...  I would prefer the suggest from Guozhang to use a flag like 
`enforceMaterialization` (or maybe better `materializeIfNeeded`).
   
   Also, the method may or may not enable sending old values and thus, we might 
want to rename it to `maybeEnableSendOldValues` (or 
`enableSendOldValuesIfPossible` or similar).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       Instead of the check, should we pass in `false` to enforce a 
materialization if necessary?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       Why does the `filter` need to send old values? I though the `filter` 
needs to _receive_ old values, and thus we should call `this. 
enableSendingOldValues(true)` to enable sending old values on the filter's 
input?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void 
shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, 
consumed);
+        final KTableImpl<String, Integer, Integer> table2 = 
(KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       Why do we need to call this (ie, why do we want the result of the filter 
to send old values)?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
##########
@@ -304,6 +363,30 @@ public void testStateStore() {
         }
     }
 
+    @Test
+    public void testSendingOldValuesNotSetIfNotMaterialized() {

Review comment:
       `shouldNotSetSendingOldValuesIfNotMaterialized`

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
##########
@@ -304,6 +363,30 @@ public void testStateStore() {
         }
     }
 
+    @Test
+    public void testSendingOldValuesNotSetIfNotMaterialized() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KTableImpl<String, String, String> table =
+            (KTableImpl<String, String, String>) builder.table("topic1", 
consumed);
+
+        table.enableSendingOldValues(true);
+
+        assertThat(table.sendingOldValueEnabled(), is(false));
+    }
+
+    @Test
+    public void testSendingOldValuesSetIfMaterializedForced() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KTableImpl<String, String, String> table =
+            (KTableImpl<String, String, String>) builder.table("topic1", 
consumed);

Review comment:
       Seems the `Matererialized.as("fred")` parameter is missing?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) 
processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Seems the boolean input parameter is missing?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always 
be materialized");
+        }
+
+        if (!parent2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always 
be materialized");
+        }
+
+        if (!table2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void 
shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, 
consumed);
+        final KTableImpl<String, Integer, Integer> table2 = 
(KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);
+
+        doTestNotSendingOldValue(builder, table1, table2, topic1);
+    }
+
+    @Test
+    public void shouldNotSendOldValuesOnMaterializationIfOptionallyRequested() 
{
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, 
consumed);
+        final KTableImpl<String, Integer, Integer> table2 =
+            (KTableImpl<String, Integer, Integer>) table1.filter(predicate, 
Materialized.as("store2"));
+
+        table2.enableSendingOldValues(true);

Review comment:
       As we enable this on the result `KTable`, and we enforce a 
materialization of the result, I would expect that we actually do get 
old-values sent?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void 
shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, 
consumed);
+        final KTableImpl<String, Integer, Integer> table2 = 
(KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);
+
+        doTestNotSendingOldValue(builder, table1, table2, topic1);
+    }
+
+    @Test
+    public void shouldNotSendOldValuesOnMaterializationIfOptionallyRequested() 
{
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, 
consumed);
+        final KTableImpl<String, Integer, Integer> table2 =
+            (KTableImpl<String, Integer, Integer>) table1.filter(predicate, 
Materialized.as("store2"));
+
+        table2.enableSendingOldValues(true);
+
+        doTestNotSendingOldValue(builder, table1, table2, topic1);
+    }
+
     private void doTestSendingOldValue(final StreamsBuilder builder,
                                        final KTableImpl<String, Integer, 
Integer> table1,
                                        final KTableImpl<String, Integer, 
Integer> table2,
                                        final String topic1) {
-        table2.enableSendingOldValues();

Review comment:
       Should we actually keep this call within this method and pass in the 
boolean flag as parameter to `doTestSendingOldValue` ?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
##########
@@ -304,6 +363,30 @@ public void testStateStore() {
         }
     }
 
+    @Test
+    public void testSendingOldValuesNotSetIfNotMaterialized() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KTableImpl<String, String, String> table =
+            (KTableImpl<String, String, String>) builder.table("topic1", 
consumed);
+
+        table.enableSendingOldValues(true);
+
+        assertThat(table.sendingOldValueEnabled(), is(false));
+    }
+
+    @Test
+    public void testSendingOldValuesSetIfMaterializedForced() {

Review comment:
       `shouldSetSendingOldValuesIfMaterializationForced`

##########
File path: 
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
##########
@@ -39,29 +39,27 @@ class KTableTest extends FlatSpec with Matchers with 
TestDriver {
     val sinkTopic = "sink"
 
     val table = builder.stream[String, String](sourceTopic).groupBy((key, _) 
=> key).count()
-    table.filter((_, value) => value > 1).toStream.to(sinkTopic)
+    table.filter((key, value) => key.equals("a") && value == 
1).toStream.to(sinkTopic)

Review comment:
       Why do we need to change the filter condition?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to