yelianevich commented on issue #31085:
URL: https://github.com/apache/beam/issues/31085#issuecomment-2126656952

   > I don't know if there are any workarounds, as the described behavior seems 
to be (unknown) bug. It needs further investigation. Could you please provide a 
simplified pipeline that is affected by this?
   
   I could easily reproduce it locally with a test container of Confluent Kafka 
7.6.0 and Flink 1.15.4.
   I run this test with Beam 2.51.0 (the last version that work) and 2.56.0 
(always fails).
   
   Here is the test case, see the comments in the code.
   ```java
       @Test
       void testBeamFromKafkaSourcesIssue() throws Exception {
           // this topic receives data
           String topicFull = "topic-in-1";
   
           // this topic is empty - the main ingredient to reproduce the issue
           // if I remove it from the input - it works on 2.56.0
           String topicEmpty = "topic-in-2";
   
           try (AdminClient adminClient = 
KafkaAdminClient.create(kafkaProperties.buildAdminProperties())) {
               adminClient
                       .createTopics(List.of(
                               new NewTopic(topicFull, 3, (short) 1),
                               new NewTopic(topicEmpty, 3, (short) 1)
                       ))
                       .all()
                       .get(5, TimeUnit.SECONDS);
           }
   
           try (KafkaProducer<String, String> producer = new 
KafkaProducer<>(kafkaProperties.buildProducerProperties())) {
               producer.send(new ProducerRecord<>(topicFull, 0, null, 
"payload-0")).get();
               producer.send(new ProducerRecord<>(topicFull, 1, null, 
"payload-11")).get();
               producer.send(new ProducerRecord<>(topicFull, 2, null, 
"payload-222")).get();
               producer.send(new ProducerRecord<>(topicFull, 0, null, 
"payload-0")).get();
           }
   
           PipelineOptions opts = PipelineOptionsFactory.create();
           opts.setRunner(TestFlinkRunner.class);
   
           Pipeline pipeline = Pipeline.create(opts);
   
           String bootstrapServers = String.join(",", 
kafkaProperties.getBootstrapServers());
   
           PCollection<KafkaRecord<String, String>> readFullTopic = pipeline
                   .apply("ReadTopic1", createReader(topicFull, 
bootstrapServers));
           PCollection<KafkaRecord<String, String>> readEmptyTopic = pipeline
                   .apply("ReadTopic2", createReader(topicEmpty, 
bootstrapServers));
   
           PCollectionList.of(List.of(readFullTopic, readEmptyTopic))
                   .apply("Flatten", Flatten.pCollections())
                   .apply("ToString", MapElements.into(strings()).via(r -> 
r.getKV().getValue()))
                   .apply("LogInput", ParDo.of(LogContext.of("Input")))
                   .apply("Window", 
Window.into(FixedWindows.of(Duration.standardSeconds(3))))
                   .apply("Count", Count.perElement())
                   .apply("LogOutput", ParDo.of(LogContext.of("Counts")));
   
           pipeline.run();
       }
   
       private static KafkaIO.Read<String, String> createReader(String 
kafkaTopic, String bootstrapServers) {
           return KafkaIO.<String, String>read()
                   .withBootstrapServers(bootstrapServers)
                   .withTopic(kafkaTopic)
                   .withKeyDeserializer(StringDeserializer.class)
                   .withValueDeserializer(StringDeserializer.class)
                   .withConsumerConfigUpdates(Map.of(
                           AUTO_OFFSET_RESET_CONFIG, "earliest"
                   ));
       }
   
       @AllArgsConstructor(staticName = "of")
       static class LogContext<T> extends DoFn<T, T> {
           private final String prefix;
   
           @ProcessElement
           public void processElement(ProcessContext c) {
               System.out.printf("%s: Element: %s, pane: %s, ts: %s%n", prefix, 
c.element(), c.pane(), c.timestamp());
               c.output(c.element());
           }
       }
   ```
   
   Output from `LogContext`
   2.56.0 (never fires a window, never outputs counts)
   ```
   Input: Element: payload-0, pane: PaneInfo.NO_FIRING, ts: 
2024-05-23T09:31:08.674Z
   Input: Element: payload-11, pane: PaneInfo.NO_FIRING, ts: 
2024-05-23T09:31:08.695Z
   Input: Element: payload-222, pane: PaneInfo.NO_FIRING, ts: 
2024-05-23T09:31:08.696Z
   Input: Element: payload-0, pane: PaneInfo.NO_FIRING, ts: 
2024-05-23T09:31:08.696Z
   ```
   
   2.51.0 (expected)
   ```
   Input: Element: payload-0, pane: PaneInfo.NO_FIRING, ts: 
2024-05-23T09:26:43.119Z
   Input: Element: payload-11, pane: PaneInfo.NO_FIRING, ts: 
2024-05-23T09:26:43.140Z
   Input: Element: payload-222, pane: PaneInfo.NO_FIRING, ts: 
2024-05-23T09:26:43.141Z
   Input: Element: payload-0, pane: PaneInfo.NO_FIRING, ts: 
2024-05-23T09:26:43.141Z
   Counts: Element: KV{payload-0, 2}, pane: PaneInfo{isFirst=true, isLast=true, 
timing=ON_TIME, index=0, onTimeIndex=0}, ts: 2024-05-23T09:26:44.999Z
   Counts: Element: KV{payload-222, 1}, pane: PaneInfo{isFirst=true, 
isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}, ts: 
2024-05-23T09:26:44.999Z
   Counts: Element: KV{payload-11, 1}, pane: PaneInfo{isFirst=true, 
isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}, ts: 
2024-05-23T09:26:44.999Z
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to