jaagupku opened a new issue, #36284:
URL: https://github.com/apache/beam/issues/36284

   ### What happened?
   
   Hi,
   
   Beam version 2.68.0
   
   I have a Kafka pipeline where I read messages from Kafka and commit offsets 
to Kafka only when processing of them has been successful. If there are errors 
in the processing `DoFn` setup, for example with database connection, then it 
can cause issue where some messages are skipped from processing. In the 
processElement I am tracking the minimum offset and if in finishBundle the 
minimum offset is greater than the expected offset. The issue will persist 
until pipeline is manually restarted. After which messages from last committed 
offset will be read.
   
   If the offset is committed instead of raising exception then some messages 
are skipped from processing.
   
   The pipeline is defined as:
   ```java
   Map<String, Object> consumerProperties = new HashMap<>();
   consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"my-bootstrap-server:9092");
   consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id");
   consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
   consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
   
   Pipeline.create(options)
     .apply("ReadFromKafka", KafkaIO.<String, byte[]>read()
           .withBootstrapServers("my-bootstrap-server:9092")
           .withTopics("topic")
           .withConsumerConfigUpdates(consumerProperties)
           .withKeyDeserializer(StringDeserializer.class)
           .withValueDeserializerAndCoder(ByteArrayDeserializer.class, 
ByteArrayCoder.of()))
       .apply(ParDo.of(new ProcessRecordsDoFn(consumerProperties)));
   ```
   The ProcessRecordsFn:
   ```java
   public class ProcessRecordsDoFn extends DoFn<Long, Void> {
   
       private Map<String, Object> consumerProperties;
   
       private Map<TopicPartition, Long> minOffsets;
       private Map<TopicPartition, OffsetAndMetadata> offsetsToCommit;
       private Map<TopicPartition, OffsetAndMetadata> expectedOffsets;
   
       private transient KafkaConsumer<String, byte[]> consumer;
       private transient Connection connection;
   
       protected ProcessRecordsFn(Map<String, Object> consumerProperties) {
           this.consumerProperties = consumerProperties;
       }
   
       @Setup
       public void setup() throws IOException {
           consumer = new KafkaConsumer<>(consumerProperties);
           // ISSUE: Failures here cause KafkaRecords to be skipped
           // Example that demonstrates the problem:
           if (Math.random() < 0.5) {
               throw new RuntimeException("Simulated setup failure");
           }
       }
   
       @StartBundle
       public void startBundle(StartBundleContext processContext) {
           ImportJobOptions options = 
processContext.getPipelineOptions().as(ImportJobOptions.class);
           String topic = options.getKafkaTopics();
           Set<TopicPartition> allPartitions = new HashSet<>();
   
           for (PartitionInfo partitionInfo : consumer.partitionsFor(topic)) {
               allPartitions.add(new TopicPartition(topic, 
partitionInfo.partition()));
           }
           expectedOffsets = consumer.committed(allPartitions);
   
           offsetsToCommit = new HashMap<>();
           minOffsets = new HashMap<>();
       }
   
       @ProcessElement
       public void processElement(@Element KafkaRecord<String, byte[]> element) 
throws IOException {
           // Process element here
           // ...
           // Element processed
   
           TopicPartition topicPartition = new 
TopicPartition(element.getTopic(), element.getPartition());
   
           long elementOffset = element.getOffset();
           // Track minimum offset of elements processed in this bundle for 
each partition
           if (elementOffset <= minOffsets.getOrDefault(topicPartition, 
Long.MAX_VALUE)) {
               minOffsets.put(topicPartition, elementOffset);
           }
           // Track offsets to commit for each partition
           if ( (elementOffset + 1) >= 
offsetsToCommit.getOrDefault(topicPartition, new 
OffsetAndMetadata(0)).offset()) {
               offsetsToCommit.put(topicPartition, new 
OffsetAndMetadata(elementOffset + 1));
           }
       }
   
       @FinishBundle
       public void finishBundle(FinishBundleContext processContext) throws 
Exception {
           // Validate that the minimum offsets processed in this bundle match 
the expected offsets
           for (TopicPartition partition : offsetsToCommit.keySet()) {
               if (minOffsets.get(partition) != 
expectedOffsets.get(partition).offset()) {
                   // ISSUE: The pipeline eventually gets stuck where the 
minimum offset is greater than expected offset.
                   // If in that case the offset is committed, then some 
messages are skipped.
                   throw new RuntimeException("Minimum offset " + 
minOffsets.get(partition) +
                           " does not match expected offset " + 
expectedOffsets.get(partition).offset() +
                           " for partition " + partition);
               } 
           }
   
           consumer.commitSync(offsetsToCommit);
       }
   
       @Teardown
       public void teardown() throws IOException {
           if (consumer != null) {
               consumer.close();
               consumer = null;
           }
       }
   }
   ```
   In summary, issue is that failures in downstream DoFn cause gap in processed 
messages. 
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [x] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to