suraj-goel opened a new pull request, #18525:
URL: https://github.com/apache/druid/pull/18525
Fixes #18476
### Description
**Goal / summary**
Add pre-ingestion header-based filtering for Kafka ingestion. The supervisor
/ task `ioConfig` can now include an optional `headerBasedFilteringConfig`
which is evaluated *before* parsing: records whose header values do **not**
match the configured filter are filtered out (dropped) early, offsets are still
advanced so ingestion progress is preserved, and the standard ingestion metric
`ingest/events/filtered` is emitted.
Key behavioral notes
- Only **`in`** (InDimFilter) filter specs are accepted — the config
validates filter type and rejects unsupported filter types. The `in` filter’s
`dimension` is interpreted as the header name and the `values` array as the
inclusion set.
- Header bytes are decoded using a configurable encoding (default `UTF-8`),
with a configurable Caffeine-backed cache for decoded strings (default 10,000
entries) to avoid repeated decoding on hot values. Decode failures are logged
and treated as `null`.
- Filtering happens in the `KafkaRecordSupplier.poll(...)` loop; when a
record is filtered the code creates a lightweight record marked as filtered (so
offsets advance) instead of parsing it.
#### Release note
Pre-ingestion header filtering for Kafka: you can now add
`headerBasedFilteringConfig` to Kafka IO config to filter records *before*
parsing. Only **`in`** filters are supported (the `dimension` field maps to the
header name; `values` are the allowed header values). Records matching the
filter values are **dropped** before ingestion. Filtered events are counted on
metric `ingest/events/filtered`.
---
#### Behavioral / design decisions & rationale
- **Pre-parse filtering**: avoids CPU/memory overhead for records that won’t
be ingested.
- **Only `in` filter supported**: simplifies evaluation semantics and
ensures predictable behavior. The filter `dimension` maps directly to the
header name.
- **String decoding cache**: Caffeine cache prevents repeated expensive
decoding of common header values.
- **Offset advancement**: filtered records are represented as lightweight
placeholders so consumer offsets continue to advance.
---
##### Key changed/added classes in this PR
* `org.apache.druid.indexing.kafka.KafkaHeaderBasedFilterEvaluator` (new) —
evaluator that decodes header bytes and evaluates the configured Druid `in`
filter (uses Caffeine cache for decoded strings).
*
`org.apache.druid.indexing.kafka.supervisor.KafkaHeaderBasedFilteringConfig`
(new) — config holder and validator (supports only `InDimFilter`, default
encoding, default cache size).
* `org.apache.druid.indexing.kafka.KafkaRecordSupplier` (modified) —
accepts header-based config, constructs evaluator, and skips/marks filtered
records (while advancing offsets).
* `org.apache.druid.indexing.kafka.KafkaIndexTaskIOConfig` (modified) —
stores `headerBasedFilteringConfig` and exposes getter; propagated into task
record supplier.
* `org.apache.druid.indexing.kafka.KafkaIndexTask` (modified) — passes
header-based config into `KafkaRecordSupplier`.
* `org.apache.druid.indexing.kafka.KafkaIndexTaskModule` (modified) —
registers Jackson subtype for the new config.
* `org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig`
(modified) — accepts and exposes `headerBasedFilteringConfig`.
* `org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor` (modified) —
wires the config through supervisor task creation.
* Metric / meters classes (modified) — emit / account for
`ingest/events/filtered` (see emitter defaults and
`DropwizardRowIngestionMeters` / other row-ingestion meters).
*
`org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord`
(modified) — extended to represent filtered records so offsets advance even
when records are dropped.
---
### Checklist
This PR has:
- [ ] been self-reviewed.
- [ ] using the concurrency checklist (n/a unless concurrency changes were
added)
- [ ] added documentation for new or modified features or behaviors.
- [ ] a release note entry in the PR description.
- [ ] added Javadocs for most classes and all non-trivial methods.
- [ ] added comments explaining the "why" and the intent of the code
wherever would not be obvious for an unfamiliar reader.
- [ ] added unit tests or modified existing tests to cover new code paths.
- [ ] added integration tests.
- [ ] been tested in a test Druid cluster.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]