jaagupku opened a new issue, #36284:
URL: https://github.com/apache/beam/issues/36284
### What happened?
Hi,
Beam version 2.68.0
I have a Kafka pipeline where I read messages from Kafka and commit offsets
to Kafka only when processing of them has been successful. If there are errors
in the processing `DoFn` setup, for example with database connection, then it
can cause issue where some messages are skipped from processing. In the
processElement I am tracking the minimum offset and if in finishBundle the
minimum offset is greater than the expected offset. The issue will persist
until pipeline is manually restarted. After which messages from last committed
offset will be read.
If the offset is committed instead of raising exception then some messages
are skipped from processing.
The pipeline is defined as:
```java
Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"my-bootstrap-server:9092");
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id");
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Pipeline.create(options)
.apply("ReadFromKafka", KafkaIO.<String, byte[]>read()
.withBootstrapServers("my-bootstrap-server:9092")
.withTopics("topic")
.withConsumerConfigUpdates(consumerProperties)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder(ByteArrayDeserializer.class,
ByteArrayCoder.of()))
.apply(ParDo.of(new ProcessRecordsDoFn(consumerProperties)));
```
The ProcessRecordsFn:
```java
public class ProcessRecordsDoFn extends DoFn<Long, Void> {
private Map<String, Object> consumerProperties;
private Map<TopicPartition, Long> minOffsets;
private Map<TopicPartition, OffsetAndMetadata> offsetsToCommit;
private Map<TopicPartition, OffsetAndMetadata> expectedOffsets;
private transient KafkaConsumer<String, byte[]> consumer;
private transient Connection connection;
protected ProcessRecordsFn(Map<String, Object> consumerProperties) {
this.consumerProperties = consumerProperties;
}
@Setup
public void setup() throws IOException {
consumer = new KafkaConsumer<>(consumerProperties);
// ISSUE: Failures here cause KafkaRecords to be skipped
// Example that demonstrates the problem:
if (Math.random() < 0.5) {
throw new RuntimeException("Simulated setup failure");
}
}
@StartBundle
public void startBundle(StartBundleContext processContext) {
ImportJobOptions options =
processContext.getPipelineOptions().as(ImportJobOptions.class);
String topic = options.getKafkaTopics();
Set<TopicPartition> allPartitions = new HashSet<>();
for (PartitionInfo partitionInfo : consumer.partitionsFor(topic)) {
allPartitions.add(new TopicPartition(topic,
partitionInfo.partition()));
}
expectedOffsets = consumer.committed(allPartitions);
offsetsToCommit = new HashMap<>();
minOffsets = new HashMap<>();
}
@ProcessElement
public void processElement(@Element KafkaRecord<String, byte[]> element)
throws IOException {
// Process element here
// ...
// Element processed
TopicPartition topicPartition = new
TopicPartition(element.getTopic(), element.getPartition());
long elementOffset = element.getOffset();
// Track minimum offset of elements processed in this bundle for
each partition
if (elementOffset <= minOffsets.getOrDefault(topicPartition,
Long.MAX_VALUE)) {
minOffsets.put(topicPartition, elementOffset);
}
// Track offsets to commit for each partition
if ( (elementOffset + 1) >=
offsetsToCommit.getOrDefault(topicPartition, new
OffsetAndMetadata(0)).offset()) {
offsetsToCommit.put(topicPartition, new
OffsetAndMetadata(elementOffset + 1));
}
}
@FinishBundle
public void finishBundle(FinishBundleContext processContext) throws
Exception {
// Validate that the minimum offsets processed in this bundle match
the expected offsets
for (TopicPartition partition : offsetsToCommit.keySet()) {
if (minOffsets.get(partition) !=
expectedOffsets.get(partition).offset()) {
// ISSUE: The pipeline eventually gets stuck where the
minimum offset is greater than expected offset.
// If in that case the offset is committed, then some
messages are skipped.
throw new RuntimeException("Minimum offset " +
minOffsets.get(partition) +
" does not match expected offset " +
expectedOffsets.get(partition).offset() +
" for partition " + partition);
}
}
consumer.commitSync(offsetsToCommit);
}
@Teardown
public void teardown() throws IOException {
if (consumer != null) {
consumer.close();
consumer = null;
}
}
}
```
In summary, issue is that failures in downstream DoFn cause gap in processed
messages.
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [ ] Component: Python SDK
- [x] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Infrastructure
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]