mjsax commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1164930944


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java:
##########
@@ -705,12 +711,304 @@ public void shouldNotOptimizeJoinWhenNotInConfig() {
         assertEquals(count.get(), 2);
     }
 
+    @Test
+    public void shouldSetUseVersionedSemanticsOnTableFilter() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materializedInternal =
+            new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("store",
 Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, 
materializedInternal);
+        table1.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateNode() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> versionedMaterialize =
+            new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned",
 Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, 
versionedMaterialize);
+        final KTable<String, String> table2 = table1.mapValues(v -> v != null 
? v + v : null);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void 
shouldNotSetUseVersionedSemanticsWithMaterializedIntermediateNode() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> versionedMaterialize =
+            new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned",
 Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> unversionedMaterialize =
+            new MaterializedInternal<>(Materialized.as("unversioned"), 
builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, 
versionedMaterialize);
+        final KTable<String, String> table2 = table1.mapValues(v -> v != null 
? v + v : null, unversionedMaterialize);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    @Test
+    public void 
shouldSetUseVersionedSemanticsWithIntermediateNodeMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> versionedMaterialize =
+            new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned",
 Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> versionedMaterialize2 =
+            new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2",
 Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, 
versionedMaterialize);
+        final KTable<String, String> table2 = table1.mapValues(v -> v != null 
? v + v : null, versionedMaterialize2);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithIntermediateAggregation() 
{
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> versionedMaterialize =
+            new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned",
 Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, 
versionedMaterialize);
+        final KTable<String, Long> table2 = 
table1.groupBy(KeyValue::new).count();
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    @Test
+    public void 
shouldSetUseVersionedSemanticsWithIntermediateAggregationMaterializedAsVersioned()
 {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> versionedMaterialize =
+            new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned",
 Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, Long, KeyValueStore<Bytes, byte[]>> 
versionedMaterialize2 =
+            new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2",
 Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, 
versionedMaterialize);
+        final KTable<String, Long> table2 = 
table1.groupBy(KeyValue::new).count(versionedMaterialize2);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithIntermediateJoin() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> versionedMaterialize =
+            new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned",
 Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> versionedMaterialize2 =
+            new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2",
 Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, 
versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, 
versionedMaterialize2);
+        final KTable<String, String> table3 = table1.join(table2, (v1, v2) -> 
v1 + v2);
+        table3.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    // not recommended to materialize join result as versioned since semantics 
are not correct,
+    // but this test is included anyway for completeness
+    @Test
+    public void 
shouldSetUseVersionedSemanticsWithIntermediateJoinMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> versionedMaterialize =
+            new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned",
 Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> versionedMaterialize2 =
+            new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2",
 Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> versionedMaterialize3 =
+            new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned3",
 Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, 
versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, 
versionedMaterialize2);
+        final KTable<String, String> table3 = table1.join(table2, (v1, v2) -> 
v1 + v2, versionedMaterialize3);
+        table3.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void 
shouldNotSetUseVersionedSemanticsWithIntermediateForeignKeyJoin() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> versionedMaterialize =
+            new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned",
 Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> versionedMaterialize2 =
+            new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2",
 Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, 
versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, 
versionedMaterialize2);
+        final KTable<String, String> table3 = table1.join(table2, v -> v, (v1, 
v2) -> v1 + v2);
+        table3.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    // not recommended to materialize join result as versioned since semantics 
are not correct,
+    // but this test is included anyway for completeness
+    @Test
+    public void 
shouldSetUseVersionedSemanticsWithIntermediateForeignKeyJoinMaterializedAsVersioned()
 {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> versionedMaterialize =
+            new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned",
 Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> versionedMaterialize2 =
+            new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2",
 Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> versionedMaterialize3 =
+            new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned3",
 Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, 
versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, 
versionedMaterialize2);
+        final KTable<String, String> table3 = table1.join(table2, v -> v, (v1, 
v2) -> v1 + v2, versionedMaterialize3);
+        table3.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithToStreamAndBack() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> versionedMaterialize =
+            new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned",
 Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, 
versionedMaterialize);
+        final KTable<String, String> table2 = table1.toStream().toTable();
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    @Test
+    public void 
shouldSetUseVersionedSemanticsWithToStreamAndBackIfMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> versionedMaterialize =
+            new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned",
 Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> versionedMaterialize2 =
+            new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2",
 Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, 
versionedMaterialize);
+        final KTable<String, String> table2 = 
table1.toStream().toTable(versionedMaterialize2);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsOnTableRepartitionMap() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> versionedMaterialize =
+            new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned",
 Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, 
versionedMaterialize);
+        table1.groupBy(KeyValue::new).count();
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode repartitionMap = getNodeByType(builder.root, 
TableRepartitionMapNode.class, new HashSet<>());
+        assertNotNull(repartitionMap);
+        verifyVersionedSemantics((TableRepartitionMapNode<?, ?>) 
repartitionMap, true);
+    }
+
+    @Test
+    public void 
shouldSetUseVersionedSemanticsOnTableRepartitionMapWithIntermediateNodes() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> versionedMaterialize =
+            new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned",
 Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, 
versionedMaterialize);
+        final KTable<String, String> table2 = table1.filter((k, v) -> v != 
null).mapValues(v -> v + v);
+        table2.groupBy(KeyValue::new).count();
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode repartitionMap = getNodeByType(builder.root, 
TableRepartitionMapNode.class, new HashSet<>());
+        assertNotNull(repartitionMap);
+        verifyVersionedSemantics((TableRepartitionMapNode<?, ?>) 
repartitionMap, true);
+    }
+
+    private void verifyVersionedSemantics(final TableFilterNode<?, ?> 
filterNode, final boolean expectedValue) {
+        final ProcessorSupplier<?, ?, ?, ?> processorSupplier = 
filterNode.processorParameters().processorSupplier();
+        assertTrue(processorSupplier instanceof KTableFilter);
+        final KTableFilter<?, ?> tableFilter = (KTableFilter<?, ?>) 
processorSupplier;
+        assertEquals(expectedValue, tableFilter.isUseVersionedSemantics());
+    }
+
+    private void verifyVersionedSemantics(final TableRepartitionMapNode<?, ?> 
repartitionMapNode, final boolean expectedValue) {
+        final ProcessorSupplier<?, ?, ?, ?> processorSupplier = 
repartitionMapNode.processorParameters().processorSupplier();
+        assertTrue(processorSupplier instanceof KTableRepartitionMap);
+        final KTableRepartitionMap<?, ?, ?, ?> repartitionMap = 
(KTableRepartitionMap<?, ?, ?, ?>) processorSupplier;
+        assertEquals(expectedValue, repartitionMap.isUseVersionedSemantics());
+    }
+
     private GraphNode getNodeByType(
         final GraphNode currentNode,
         final Class<? extends GraphNode> clazz,
         final Set<GraphNode> visited) {
 
-        if (currentNode.getClass().isAssignableFrom(clazz)) {
+        if (clazz.isAssignableFrom(currentNode.getClass())) {

Review Comment:
   Why is this flipped?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to