suraj-goel opened a new pull request, #18525:
URL: https://github.com/apache/druid/pull/18525

   Fixes #18476
   
   ### Description
   
   **Goal / summary**
   
   Add pre-ingestion header-based filtering for Kafka ingestion. The supervisor 
/ task `ioConfig` can now include an optional `headerBasedFilteringConfig` 
which is evaluated *before* parsing: records whose header values do **not** 
match the configured filter are filtered out (dropped) early, offsets are still 
advanced so ingestion progress is preserved, and the standard ingestion metric 
`ingest/events/filtered` is emitted.
   
   Key behavioral notes
   
   - Only **`in`** (InDimFilter) filter specs are accepted — the config 
validates filter type and rejects unsupported filter types. The `in` filter’s 
`dimension` is interpreted as the header name and the `values` array as the 
inclusion set.  
   - Header bytes are decoded using a configurable encoding (default `UTF-8`), 
with a configurable Caffeine-backed cache for decoded strings (default 10,000 
entries) to avoid repeated decoding on hot values. Decode failures are logged 
and treated as `null`.  
   - Filtering happens in the `KafkaRecordSupplier.poll(...)` loop; when a 
record is filtered the code creates a lightweight record marked as filtered (so 
offsets advance) instead of parsing it.  
   
   #### Release note
   
   Pre-ingestion header filtering for Kafka: you can now add 
`headerBasedFilteringConfig` to Kafka IO config to filter records *before* 
parsing. Only **`in`** filters are supported (the `dimension` field maps to the 
header name; `values` are the allowed header values). Records matching the 
filter values are **dropped** before ingestion. Filtered events are counted on 
metric `ingest/events/filtered`.
   
   ---
   
   #### Behavioral / design decisions & rationale
   
   - **Pre-parse filtering**: avoids CPU/memory overhead for records that won’t 
be ingested.  
   - **Only `in` filter supported**: simplifies evaluation semantics and 
ensures predictable behavior. The filter `dimension` maps directly to the 
header name.  
   - **String decoding cache**: Caffeine cache prevents repeated expensive 
decoding of common header values.  
   - **Offset advancement**: filtered records are represented as lightweight 
placeholders so consumer offsets continue to advance.  
   
   ---
   
   ##### Key changed/added classes in this PR
    * `org.apache.druid.indexing.kafka.KafkaHeaderBasedFilterEvaluator` (new) — 
evaluator that decodes header bytes and evaluates the configured Druid `in` 
filter (uses Caffeine cache for decoded strings).  
    * 
`org.apache.druid.indexing.kafka.supervisor.KafkaHeaderBasedFilteringConfig` 
(new) — config holder and validator (supports only `InDimFilter`, default 
encoding, default cache size).  
    * `org.apache.druid.indexing.kafka.KafkaRecordSupplier` (modified) — 
accepts header-based config, constructs evaluator, and skips/marks filtered 
records (while advancing offsets).  
    * `org.apache.druid.indexing.kafka.KafkaIndexTaskIOConfig` (modified) — 
stores `headerBasedFilteringConfig` and exposes getter; propagated into task 
record supplier.  
    * `org.apache.druid.indexing.kafka.KafkaIndexTask` (modified) — passes 
header-based config into `KafkaRecordSupplier`.  
    * `org.apache.druid.indexing.kafka.KafkaIndexTaskModule` (modified) — 
registers Jackson subtype for the new config.  
    * `org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig` 
(modified) — accepts and exposes `headerBasedFilteringConfig`.  
    * `org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor` (modified) — 
wires the config through supervisor task creation.  
    * Metric / meters classes (modified) — emit / account for 
`ingest/events/filtered` (see emitter defaults and 
`DropwizardRowIngestionMeters` / other row-ingestion meters).  
    * 
`org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord` 
(modified) — extended to represent filtered records so offsets advance even 
when records are dropped.  
   
   ---
   
   ### Checklist
   
   This PR has:
   
   - [ ] been self-reviewed.
     - [ ] using the concurrency checklist (n/a unless concurrency changes were 
added)
   - [ ] added documentation for new or modified features or behaviors.  
   - [ ] a release note entry in the PR description.  
   - [ ] added Javadocs for most classes and all non-trivial methods.  
   - [ ] added comments explaining the "why" and the intent of the code 
wherever would not be obvious for an unfamiliar reader.  
   - [ ] added unit tests or modified existing tests to cover new code paths.  
   - [ ] added integration tests.  
   - [ ] been tested in a test Druid cluster.  
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to