mjsax commented on code in PR #13552: URL: https://github.com/apache/kafka/pull/13552#discussion_r1164930944
########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java: ########## @@ -705,12 +711,304 @@ public void shouldNotOptimizeJoinWhenNotInConfig() { assertEquals(count.get(), 2); } + @Test + public void shouldSetUseVersionedSemanticsOnTableFilter() { + // Given: + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal = + new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("store", Duration.ofMinutes(5))), builder, storePrefix); + final KTable<String, String> table1 = builder.table("t1", consumed, materializedInternal); + table1.filter((k, v) -> v != null); + + // When: + builder.buildAndOptimizeTopology(); + + // Then: + final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>()); + assertNotNull(filter); + verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true); + } + + @Test + public void shouldSetUseVersionedSemanticsWithIntermediateNode() { + // Given: + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize = + new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix); + final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize); + final KTable<String, String> table2 = table1.mapValues(v -> v != null ? v + v : null); + table2.filter((k, v) -> v != null); + + // When: + builder.buildAndOptimizeTopology(); + + // Then: + final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>()); + assertNotNull(filter); + verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true); + } + + @Test + public void shouldNotSetUseVersionedSemanticsWithMaterializedIntermediateNode() { + // Given: + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize = + new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix); + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> unversionedMaterialize = + new MaterializedInternal<>(Materialized.as("unversioned"), builder, storePrefix); + final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize); + final KTable<String, String> table2 = table1.mapValues(v -> v != null ? v + v : null, unversionedMaterialize); + table2.filter((k, v) -> v != null); + + // When: + builder.buildAndOptimizeTopology(); + + // Then: + final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>()); + assertNotNull(filter); + verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false); + } + + @Test + public void shouldSetUseVersionedSemanticsWithIntermediateNodeMaterializedAsVersioned() { + // Given: + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize = + new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix); + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 = + new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix); + final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize); + final KTable<String, String> table2 = table1.mapValues(v -> v != null ? v + v : null, versionedMaterialize2); + table2.filter((k, v) -> v != null); + + // When: + builder.buildAndOptimizeTopology(); + + // Then: + final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>()); + assertNotNull(filter); + verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true); + } + + @Test + public void shouldNotSetUseVersionedSemanticsWithIntermediateAggregation() { + // Given: + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize = + new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix); + final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize); + final KTable<String, Long> table2 = table1.groupBy(KeyValue::new).count(); + table2.filter((k, v) -> v != null); + + // When: + builder.buildAndOptimizeTopology(); + + // Then: + final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>()); + assertNotNull(filter); + verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false); + } + + @Test + public void shouldSetUseVersionedSemanticsWithIntermediateAggregationMaterializedAsVersioned() { + // Given: + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize = + new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix); + final MaterializedInternal<String, Long, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 = + new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix); + final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize); + final KTable<String, Long> table2 = table1.groupBy(KeyValue::new).count(versionedMaterialize2); + table2.filter((k, v) -> v != null); + + // When: + builder.buildAndOptimizeTopology(); + + // Then: + final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>()); + assertNotNull(filter); + verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true); + } + + @Test + public void shouldNotSetUseVersionedSemanticsWithIntermediateJoin() { + // Given: + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize = + new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix); + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 = + new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix); + final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize); + final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2); + final KTable<String, String> table3 = table1.join(table2, (v1, v2) -> v1 + v2); + table3.filter((k, v) -> v != null); + + // When: + builder.buildAndOptimizeTopology(); + + // Then: + final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>()); + assertNotNull(filter); + verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false); + } + + // not recommended to materialize join result as versioned since semantics are not correct, + // but this test is included anyway for completeness + @Test + public void shouldSetUseVersionedSemanticsWithIntermediateJoinMaterializedAsVersioned() { + // Given: + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize = + new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix); + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 = + new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix); + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize3 = + new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned3", Duration.ofMinutes(5))), builder, storePrefix); + final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize); + final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2); + final KTable<String, String> table3 = table1.join(table2, (v1, v2) -> v1 + v2, versionedMaterialize3); + table3.filter((k, v) -> v != null); + + // When: + builder.buildAndOptimizeTopology(); + + // Then: + final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>()); + assertNotNull(filter); + verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true); + } + + @Test + public void shouldNotSetUseVersionedSemanticsWithIntermediateForeignKeyJoin() { + // Given: + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize = + new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix); + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 = + new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix); + final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize); + final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2); + final KTable<String, String> table3 = table1.join(table2, v -> v, (v1, v2) -> v1 + v2); + table3.filter((k, v) -> v != null); + + // When: + builder.buildAndOptimizeTopology(); + + // Then: + final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>()); + assertNotNull(filter); + verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false); + } + + // not recommended to materialize join result as versioned since semantics are not correct, + // but this test is included anyway for completeness + @Test + public void shouldSetUseVersionedSemanticsWithIntermediateForeignKeyJoinMaterializedAsVersioned() { + // Given: + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize = + new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix); + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 = + new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix); + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize3 = + new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned3", Duration.ofMinutes(5))), builder, storePrefix); + final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize); + final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2); + final KTable<String, String> table3 = table1.join(table2, v -> v, (v1, v2) -> v1 + v2, versionedMaterialize3); + table3.filter((k, v) -> v != null); + + // When: + builder.buildAndOptimizeTopology(); + + // Then: + final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>()); + assertNotNull(filter); + verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true); + } + + @Test + public void shouldNotSetUseVersionedSemanticsWithToStreamAndBack() { + // Given: + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize = + new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix); + final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize); + final KTable<String, String> table2 = table1.toStream().toTable(); + table2.filter((k, v) -> v != null); + + // When: + builder.buildAndOptimizeTopology(); + + // Then: + final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>()); + assertNotNull(filter); + verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false); + } + + @Test + public void shouldSetUseVersionedSemanticsWithToStreamAndBackIfMaterializedAsVersioned() { + // Given: + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize = + new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix); + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 = + new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix); + final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize); + final KTable<String, String> table2 = table1.toStream().toTable(versionedMaterialize2); + table2.filter((k, v) -> v != null); + + // When: + builder.buildAndOptimizeTopology(); + + // Then: + final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>()); + assertNotNull(filter); + verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true); + } + + @Test + public void shouldSetUseVersionedSemanticsOnTableRepartitionMap() { + // Given: + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize = + new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix); + final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize); + table1.groupBy(KeyValue::new).count(); + + // When: + builder.buildAndOptimizeTopology(); + + // Then: + final GraphNode repartitionMap = getNodeByType(builder.root, TableRepartitionMapNode.class, new HashSet<>()); + assertNotNull(repartitionMap); + verifyVersionedSemantics((TableRepartitionMapNode<?, ?>) repartitionMap, true); + } + + @Test + public void shouldSetUseVersionedSemanticsOnTableRepartitionMapWithIntermediateNodes() { + // Given: + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize = + new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix); + final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize); + final KTable<String, String> table2 = table1.filter((k, v) -> v != null).mapValues(v -> v + v); + table2.groupBy(KeyValue::new).count(); + + // When: + builder.buildAndOptimizeTopology(); + + // Then: + final GraphNode repartitionMap = getNodeByType(builder.root, TableRepartitionMapNode.class, new HashSet<>()); + assertNotNull(repartitionMap); + verifyVersionedSemantics((TableRepartitionMapNode<?, ?>) repartitionMap, true); + } + + private void verifyVersionedSemantics(final TableFilterNode<?, ?> filterNode, final boolean expectedValue) { + final ProcessorSupplier<?, ?, ?, ?> processorSupplier = filterNode.processorParameters().processorSupplier(); + assertTrue(processorSupplier instanceof KTableFilter); + final KTableFilter<?, ?> tableFilter = (KTableFilter<?, ?>) processorSupplier; + assertEquals(expectedValue, tableFilter.isUseVersionedSemantics()); + } + + private void verifyVersionedSemantics(final TableRepartitionMapNode<?, ?> repartitionMapNode, final boolean expectedValue) { + final ProcessorSupplier<?, ?, ?, ?> processorSupplier = repartitionMapNode.processorParameters().processorSupplier(); + assertTrue(processorSupplier instanceof KTableRepartitionMap); + final KTableRepartitionMap<?, ?, ?, ?> repartitionMap = (KTableRepartitionMap<?, ?, ?, ?>) processorSupplier; + assertEquals(expectedValue, repartitionMap.isUseVersionedSemantics()); + } + private GraphNode getNodeByType( final GraphNode currentNode, final Class<? extends GraphNode> clazz, final Set<GraphNode> visited) { - if (currentNode.getClass().isAssignableFrom(clazz)) { + if (clazz.isAssignableFrom(currentNode.getClass())) { Review Comment: Why is this flipped? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org