RaidenE1 opened a new pull request, #20325:
URL: https://github.com/apache/kafka/pull/20325
# Summary
This PR implements error caching for internal topic creation failures in
Kafka Streams, allowing errors to be
surfaced to users via the Streams group heartbeat status instead of only
appearing in broker logs.
# Problem
Currently, when internal topic creation fails during Streams group
heartbeat processing, the error messages are
only logged in the broker logs and not exposed to users. As mentioned in
the code comments around
KafkaApis.scala:2857-2893, the result of the create topic call forwarded
to the controller is not awaited, so if
an internal topic fails to be created, users cannot see the specific
reason for the failure.
# Solution
1. Error Caching in AutoTopicCreationManager
- Added CachedTopicCreationError case class to store error messages with
timestamps
- Implemented getTopicCreationErrors() method with:
- Lazy cleanup: Expired entries are removed during access based on
configurable TTL
- Size limits: Cache is limited to 1000 entries, with oldest entries
removed when exceeded
- TTL based on existing config: Uses 3 × request.timeout.ms (default: 90
seconds) for cache expiration
- Enhanced ControllerRequestCompletionHandler.onComplete() to parse
CreateTopicsResponse and cache errors for
failed topics only
- Added proper resource cleanup in close() method
2. Integration with KafkaApis
- Enhanced Streams group heartbeat processing in KafkaApis.scala
- When MISSING_INTERNAL_TOPICS status is detected, query cached errors and
append to status details
- Only query cache when Group Coordinator has already reported missing
topics
3. Lifecycle Management
- Added autoTopicCreationManager.close() call in BrokerServer.shutdown()
to ensure proper cleanup
# Key Features
- Thread-safe: Uses ConcurrentHashMap for concurrent access
- Memory efficient: TTL-based expiration and size limits prevent memory
leaks
- Configurable TTL: Based on existing request.timeout.ms configuration (3×
multiplier)
- Lazy cleanup: No background threads needed - cleanup happens during
normal operation
- Selective caching: Only caches actual failures (errorCode != NONE),
successful creations are ignored
- Comprehensive error handling: Handles authentication failures, version
mismatches, and topic-specific errors
- Backward compatible: No changes to existing APIs or behavior
# Configuration
The error cache TTL is automatically calculated as 3 × request.timeout.ms:
- Default: 90 seconds (3 × 30s default request timeout)
- Configurable: Adjusts automatically when request.timeout.ms is modified
- Cache size limit: 1000 entries (hardcoded)
# Testing
- Added comprehensive unit tests for error caching, TTL cleanup, and size
limit management
- Added integration test for KafkaApis to verify end-to-end functionality
- Updated test cases to use realistic TTL values based on test
configuration
- All existing tests pass without modification
# Code Changes
- AutoTopicCreationManager.scala: Added error caching functionality (~70
lines)
- KafkaApis.scala: Enhanced Streams heartbeat processing (~15 lines)
- BrokerServer.scala: Added cleanup call in shutdown (~2 lines)
- Test files: Added comprehensive test coverage (~170 lines)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]