Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/15102 Right now since there are some arguments about how to handle various failures, I'm listing what I found via stress test to discuss: (1) Kafka APIs fail because we cannot connect to Kafka cluster. Solution: fail the query. (2) `getOffset` fails temporarily because some of topics are deleted at the same time. `consumer.position` will throw NPE due to this race condition. Solution: retry (This is why `withRetries` is added) ``` consumer.poll(0) val partitions = consumer.assignment() consumer.position(p) ``` (3) In `getBatch`, some partitions are new because they are not in `fromOffsets`. Then we will call `fetchNewPartitionEarliestOffsets` to fetch these partitions. However, some of these new partitions may be deleted due to topic deletion, then they won't appear in `consumer.assignment()`. Solution: log a warning. (4) In `getBatch`, some partitions are new because they are not in `fromOffsets`. Then we will call `fetchNewPartitionEarliestOffsets` to fetch these partitions. Similiar to (2), `consumer.position` may throw NPE due to this race condition. Solution : retry. (5) Topics are deleted when a Spark job is runinng, which may cause `OffsetOutOfRangeException`. (I'm not sure if there are more types of exceptions, may need to investigate) Solution: log a warning. (6) A topic is deleted then added. This may make untilOffset is less than fromOffset. Solution: log a warning.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org