Sounds like https://issues.apache.org/jira/browse/KAFKA-16394

-Matthias

On 8/29/24 02:35, Vogel, Kevin, DP EPS, BN, extern, external wrote:
Hello there,

I searched the Apache Jira for a bug report on this topic but couldn't find 
one. Maybe anyone else has noticed something similar or knows more about this.

After we updated our Spring Boot Kafka Streams application kafka-streams 
dependency from 3.6.2 to 3.7.1, we noticed some failing tests. The expected 
behavior of the streams-processor is, to join two input topics together and 
produce a single output record. Then the test injects a third record with a 
null value and expects another output record with a null value. But since the 
update we are getting two records with null value instead of one.

I stripped down the processor to the absolute minimum, so you have some example 
code. Don't worry about the strange names. We are working with a legacy system 
and are unfortunately bound to them:

@Bean
public BiFunction<
     KStream<Gidpf01iKey, Gidpf01iValue>,
     KTable<String, Gidpf01gAggregate>,
     KStream<UuidString, Teil>>
processTeil() {
   return (gidpf01i, gidpf01gAggregate) -> {
     var gidpf01iTable =
         gidpf01i
             .toTable();

     var joined =
         gidpf01iTable.leftJoin(
             gidpf01gAggregate,
             new Gidpf01iGidpf01gAggregateForeignKeyExtractor(),
             new Gidpf01iGidpf01gAggregateValueJoiner(),
             TableJoined.as("gidpf01i-gidpf01g-aggregate-to-teil"));

     return joined.toStream().selectKey(new GenericSbamUuidGenerator<>());
   };
}

You see nothing fancy. Just a KTable - Ktable left-join. Even more strange is, 
that not all streams-processors behave differently after the update. But they 
are all very similar and I can not see any significant difference.

The test looks like this:

   @Test
   void testProcessGidpf01iTombstone() {

     // Key und Value für Teilestamm und Teiletext
     final var gidpf01i = createGidpf01iKeyValue("A", REFRESH);
     final var gidpf01gAggregate =
         createGidpf01gAggregate("T123456789", "Test Beschreibung", "Test 
Zusatzinformation");

     gipf01gAggregateInputTopic.pipeInput(gidpf01gAggregate);
     gidpf01iInputTopic.pipeInput(gidpf01i);

     // Datensatz gelesen
     var results = teilOutputTopic.readKeyValuesToList();
     assertThat(results).hasSize(1);
     assertThat(results.get(0).key).isInstanceOf(UuidString.class);
     assertThat(results.get(0).value).isNotNull();

     // Tombstone für GIDPF01I senden
     gidpf01iInputTopic.pipeInput(new TestRecord<>(gidpf01i.key(), null));

     results = teilOutputTopic.readKeyValuesToList();
     assertThat(results).hasSize(1);                 // <--- failing here 
because results contain two identical records with null value instead of one
     assertThat(results.get(0).key).isInstanceOf(UuidString.class);
     assertThat(results.get(0).value).isNull();

     assertThat(gidpf01iDlqOutputTopic.isEmpty()).isTrue();
   }

Does anyone know about a bug or behavioral change like this in Kafka streams 
3.7.1? I'm very grateful for any response.

Kind Regards
Kevin Vogel
Software Developer extern
Mitarbeiter der Qvest Digital AG

Am Dickobskreuz 10, D-53121 Bonn
Tel.: +49 228 54881-0
HRB AG Bonn 18196 Ust-ID (VAT): DE274355441
Vorstand: Dr. Stefan Barth, Kai Ebenrett, Boris Esser, Alexander Steeg
Vorsitzender Aufsichtsrat: Stefan Nöthen


Reply via email to