nbali opened a new issue, #25962: URL: https://github.com/apache/beam/issues/25962
### What happened? Scenario: Batch Kafka ingestion (`.withStartReadTime()`, `.withStopReadTime()`, experiments: `beam_fn_api`, `unsafely_attempt_to_process_unbounded_data_in_batch_mode`, `shuffle_mode=appliance`) reading a few billions / few hundred gb of kafka records from a few hundred ktable topic partitions. Some of these partitions has a tendency to get stuck. The issue started happening with 2.45 (still do with 2.46), so my original idea was that there must have been some change introduced in 2.45 that caused this. ... but after some investigation my guess is that previously it was just hidden/supressed, but the issue already existed and some change only revealed/emphasized it. So I think the problematic part is this: https://github.com/apache/beam/blob/ddae966f3346fbe247486324cbf8a8a532895316/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java#L383-L388 What causes the issue is that the `consumer.poll` can return `ConsumerRecords` that contains no `KafkaRecord` - so we assume we have reached the end - when there are still remaining records to be consumed. This happens because if you check the kafka consumer implementation (https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1259-L1276 and https://github.com/apache/kafka/blob/f79c2a6e04129832eacec60bbc180173ecc37549/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java#L102-L104 ) you can see that the `poll()` will consider a `Fetch` "successful" if the "position has advanced" even if the `numRecords` is 0. I'm not completely sure this is the reason, but totally seems plausible to me: So lets assume we have KTable kafka topics with key compaction turned on. Lets assume they exist for a long time, and has a very huge compaction rate (only a few percent of the offsets contain a record.) It is not hard to imagine there are huge ranges of offsets that has no records. Well this is the case with our topics. So what I see is that it tries to poll, gets back an "empty" `ConsumerRecords` (0 record, but `positionAdvanced` is true), and returns with a `ProcessContinuation.resume()`. This keeps repeating forever. The loop happens because, we stop the current consumption with a return, and the next `processElement()` will `seek()` the consumer to the `startOffset` (aka `tracker.currentRestriction().getFrom()` https://github.com/apache/beam/blob/ddae966f3346fbe247486324cbf8a8a532895316/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java#L376 ), but in the previous `processElement()` we didn't notify the tracker of the progress at all, so the `startOffset` stays the same. Essentially what was fixed with the watermark for streaming jobs with https://github.com/apache/beam/pull/24205 has to be also fixed for the tracker/restriction/offset. In order to solve this we have to know if the "empty" `ConsumerRecord` is really at the end of the stream, or it's just at a "gap". So what I came up with as some possible solutions: 1. Contribute to [Kafka](https://github.com/apache/kafka) so the returned `ConsumerRecords` contains some distinction between the two types of "empty". (Essentially representing `positionAdvanced`, or possible the consumer position.) - _No idea if they will accept it, and even if they do it's going to take the longest to include in beam_ 2. Compare `consumer.position()` before and after `consumer.poll()` to figure out if we have "advanced" or not. - _It can be used to detect the advancement, but not sure if the position of the consumer always correlates with the end of the returned range or it might differ so if it can be used to update the tracker or not_ 3. Compare the returned `ConsumerRecords` to `ConsumerRecords.EMPTY` - _I would say this is ugly as it's an implementation detail of the Kafka library, but we can test if they change it with unit tests_ 4. Check if `((HasProgress) tracker).getProgress().getWorkRemaining()` is positive. - _I'm not sure how up-to-date that information is, but it can certainly indicate if we are in a huge gap in the middle of the processing._ 5. Check if this is a BOUNDED or UNBOUNDED consumption as batch pipelines should return with a `.stop()` when they reach the end of the range anyway. - _This will fix this issue only when consuming kafka in batch pipelines, but this could most likely happen with streaming pipelines too if they use the same SDF implementation. Once we know what case we encountered, we can either continue the `while (true)` loop and `poll` the next batch, or if we have the position/offset, we can update the tracker and return with `.resume()`. FYI increasing batch/poll size by increasing `ConsumerConfig.FETCH_MAX_BYTES_CONFIG`, `ConsumerConfig.MAX_POLL_RECORDS_CONFIG`, etc alleviates the issue, but doesn't fix it completely. ### Issue Priority Priority: 2 (default / most bugs should be filed as P2) ### Issue Components - [ ] Component: Python SDK - [X] Component: Java SDK - [ ] Component: Go SDK - [ ] Component: Typescript SDK - [ ] Component: IO connector - [ ] Component: Beam examples - [ ] Component: Beam playground - [ ] Component: Beam katas - [ ] Component: Website - [ ] Component: Spark Runner - [ ] Component: Flink Runner - [ ] Component: Samza Runner - [ ] Component: Twister2 Runner - [ ] Component: Hazelcast Jet Runner - [ ] Component: Google Cloud Dataflow Runner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
