[ https://issues.apache.org/jira/browse/KAFKA-10077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Andy Coates updated KAFKA-10077: -------------------------------- Description: Adding a `filter` call downstream of anything that has a state store, e.g. a table source, results in spurious tombstones being emitted from the topology for any key where a new entry doesn't match the filter, _even when no previous value existed for the row_. To put this another way: a filer downstream of a state-store will output a tombstone on an INSERT the doesn't match the filter, when it should only output a tombstone on an UPDATE. This code shows the problem: {code:java} final StreamsBuilder builder = new StreamsBuilder(); builder .table("table", Materialized.with(Serdes.Long(), Serdes.Long())) .filter((k, v) -> v % 2 == 0) .toStream() .to("bob"); final Topology topology = builder.build(); final Properties props = new Properties(); props.put("application.id", "fred"); props.put("bootstrap.servers", "who cares"); final TopologyTestDriver driver = new TopologyTestDriver(topology, props); final TestInputTopic<Long, Long> input = driver .createInputTopic("table", Serdes.Long().serializer(), Serdes.Long().serializer()); input.pipeInput(1L, 2L); input.pipeInput(1L, 1L); input.pipeInput(2L, 1L); final TestOutputTopic<Long, Long> output = driver .createOutputTopic("bob", Serdes.Long().deserializer(), Serdes.Long().deserializer()); final List<KeyValue<Long, Long>> keyValues = output.readKeyValuesToList(); // keyValues contains: // 1 -> 1 // 1 -> null <-- correct tombstone: deletes previous row. // 2 -> null <-- spurious tombstone: no previous row. {code} These spurious tombstones can cause a LOT of noise when, for example, the filter is looking for a specific key. In such a situation, _every input record that does not have that key results in a tombstone!_ meaning there are many more tombstones than useful data. I believe the fix is to turn on {{KTableImpl::enableSendingOldValues}} for any filter that is downstream of a statestore was: Adding a `filter` call downstream of anything that has a state store, e.g. a table source, results in spurious tombstones being emitted from the topology for any key where a new entry doesn't match the filter, _even when no previous value existed for the row_. To put this another way: a filer downstream of a state-store will output a tombstone on an INSERT the doesn't match the filter, when it should only output a tombstone on an UPDATE. This code shows the problem: {code:java} final StreamsBuilder builder = new StreamsBuilder(); builder .table("table", Materialized.with(Serdes.Long(), Serdes.Long())) .filter((k, v) -> v % 2 == 0) .toStream() .to("bob"); final Topology topology = builder.build(); final Properties props = new Properties(); props.put("application.id", "fred"); props.put("bootstrap.servers", "who cares"); final TopologyTestDriver driver = new TopologyTestDriver(topology, props); final TestInputTopic<Long, Long> input = driver .createInputTopic("table", Serdes.Long().serializer(), Serdes.Long().serializer()); input.pipeInput(1L, 2L); input.pipeInput(1L, 1L); input.pipeInput(2L, 1L); final TestOutputTopic<Long, Long> output = driver .createOutputTopic("bob", Serdes.Long().deserializer(), Serdes.Long().deserializer()); final List<KeyValue<Long, Long>> keyValues = output.readKeyValuesToList(); // keyValues contains: // 1 -> 1 // 1 -> null <-- correct tombstone: deletes previous row. // 2 -> null <-- spurious tombstone: no previous row. {code} These spurious tombstones can cause a LOT of noise when, for example, the filter is looking for a specific key. In such a situation, _every input record that does not have that key results in a tombstone!_ meaning there are many more tombstones than useful data. > Filter downstream of state-store results in suprious tombstones > --------------------------------------------------------------- > > Key: KAFKA-10077 > URL: https://issues.apache.org/jira/browse/KAFKA-10077 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.5.0 > Reporter: Andy Coates > Priority: Major > > Adding a `filter` call downstream of anything that has a state store, e.g. a > table source, results in spurious tombstones being emitted from the topology > for any key where a new entry doesn't match the filter, _even when no > previous value existed for the row_. > To put this another way: a filer downstream of a state-store will output a > tombstone on an INSERT the doesn't match the filter, when it should only > output a tombstone on an UPDATE. > > This code shows the problem: > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > builder > .table("table", Materialized.with(Serdes.Long(), Serdes.Long())) > .filter((k, v) -> v % 2 == 0) > .toStream() > .to("bob"); > final Topology topology = builder.build(); > final Properties props = new Properties(); > props.put("application.id", "fred"); > props.put("bootstrap.servers", "who cares"); > final TopologyTestDriver driver = new TopologyTestDriver(topology, props); > final TestInputTopic<Long, Long> input = driver > .createInputTopic("table", Serdes.Long().serializer(), > Serdes.Long().serializer()); > input.pipeInput(1L, 2L); > input.pipeInput(1L, 1L); > input.pipeInput(2L, 1L); > final TestOutputTopic<Long, Long> output = driver > .createOutputTopic("bob", Serdes.Long().deserializer(), > Serdes.Long().deserializer()); > final List<KeyValue<Long, Long>> keyValues = output.readKeyValuesToList(); > // keyValues contains: > // 1 -> 1 > // 1 -> null <-- correct tombstone: deletes previous row. > // 2 -> null <-- spurious tombstone: no previous row. > {code} > > These spurious tombstones can cause a LOT of noise when, for example, the > filter is looking for a specific key. In such a situation, _every input > record that does not have that key results in a tombstone!_ meaning there are > many more tombstones than useful data. > I believe the fix is to turn on {{KTableImpl::enableSendingOldValues}} for > any filter that is downstream of a statestore -- This message was sent by Atlassian Jira (v8.3.4#803005)