nbali opened a new issue, #25962:
URL: https://github.com/apache/beam/issues/25962

   ### What happened?
   
   Scenario:
   Batch Kafka ingestion (`.withStartReadTime()`, `.withStopReadTime()`, 
experiments: `beam_fn_api`, 
`unsafely_attempt_to_process_unbounded_data_in_batch_mode`, 
`shuffle_mode=appliance`) reading a few billions / few hundred gb of kafka 
records from a few hundred ktable topic partitions. Some of these partitions 
has a tendency to get stuck.
   
   The issue started happening with 2.45 (still do with 2.46), so my original 
idea was that there must have been some change introduced in 2.45 that caused 
this. ... but after some investigation my guess is that previously it was just 
hidden/supressed, but the issue already existed and some change only 
revealed/emphasized it.
   
   So I think the problematic part is this: 
https://github.com/apache/beam/blob/ddae966f3346fbe247486324cbf8a8a532895316/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java#L383-L388
   
   What causes the issue is that the `consumer.poll` can return 
`ConsumerRecords` that contains no `KafkaRecord` - so we assume we have reached 
the end - when there are still remaining records to be consumed. This happens 
because if you check the kafka consumer implementation 
(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1259-L1276
 and 
https://github.com/apache/kafka/blob/f79c2a6e04129832eacec60bbc180173ecc37549/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java#L102-L104
 ) you can see that the `poll()` will consider a `Fetch` "successful" if the 
"position has advanced" even if the `numRecords` is 0.
   
   I'm not completely sure this is the reason, but totally seems plausible to 
me:
   So lets assume we have KTable kafka topics with key compaction turned on. 
Lets assume they exist for a long time, and has a very huge compaction rate 
(only a few percent of the offsets contain a record.) It is not hard to imagine 
there are huge ranges of offsets that has no records. Well this is the case 
with our topics.
   
   So what I see is that it tries to poll, gets back an "empty" 
`ConsumerRecords` (0 record, but `positionAdvanced` is true), and returns with 
a `ProcessContinuation.resume()`. This keeps repeating forever.
   
   The loop happens because, we stop the current consumption with a return, and 
the next `processElement()` will `seek()` the consumer to the `startOffset` 
(aka `tracker.currentRestriction().getFrom()`  
https://github.com/apache/beam/blob/ddae966f3346fbe247486324cbf8a8a532895316/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java#L376
 ), but in the previous `processElement()` we didn't notify the tracker of the 
progress at all, so the `startOffset` stays the same.
   
   Essentially what was fixed with the watermark for streaming jobs with 
https://github.com/apache/beam/pull/24205 has to be also fixed for the 
tracker/restriction/offset.
   
   In order to solve this we have to know if the "empty" `ConsumerRecord` is 
really at the end of the stream, or it's just at a "gap".
   
   So what I came up with as some possible solutions:
   1. Contribute to [Kafka](https://github.com/apache/kafka) so the returned 
`ConsumerRecords` contains some distinction between the two types of "empty". 
(Essentially representing `positionAdvanced`, or possible the consumer 
position.) - _No idea if they will accept it, and even if they do it's going to 
take the longest to include in beam_
   2. Compare `consumer.position()` before and after `consumer.poll()` to 
figure out if we have "advanced" or not. - _It can be used to detect the 
advancement, but not sure if the position of the consumer always correlates 
with the end of the returned range or it might differ so if it can be used to 
update the tracker or not_
   3. Compare the returned `ConsumerRecords` to `ConsumerRecords.EMPTY` - _I 
would say this is ugly as it's an implementation detail of the Kafka library, 
but we can test if they change it with unit tests_
   4. Check if `((HasProgress) tracker).getProgress().getWorkRemaining()` is 
positive. - _I'm not sure how up-to-date that information is, but it can 
certainly indicate if we are in a huge gap in the middle of the processing._
   5. Check if this is a BOUNDED or UNBOUNDED consumption as batch pipelines 
should return with a `.stop()` when they reach the end of the range anyway. - 
_This will fix this issue only when consuming kafka in batch pipelines, but 
this could most likely happen with streaming pipelines too if they use the same 
SDF implementation.
   
   Once we know what case we encountered, we can either continue the `while 
(true)` loop and `poll` the next batch, or if we have the position/offset, we 
can update the tracker and return with `.resume()`.
   
   FYI increasing batch/poll size by increasing 
`ConsumerConfig.FETCH_MAX_BYTES_CONFIG`, 
`ConsumerConfig.MAX_POLL_RECORDS_CONFIG`, etc alleviates the issue, but doesn't 
fix it completely.
   
   
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [X] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to