FrankChen021 commented on code in PR #18525:
URL: https://github.com/apache/druid/pull/18525#discussion_r2460937336
##########
docs/ingestion/kafka-ingestion.md:
##########
@@ -264,6 +265,105 @@ The following example shows a supervisor spec with idle
configuration enabled:
```
</details>
+#### Header-based filtering
+
+Header-based filtering allows you to filter Kafka records based on their
headers before ingestion, reducing the amount of data processed and improving
ingestion performance. This is particularly useful when you want to ingest only
a subset of records from a Kafka topic based on header values.
+
+The following table outlines the configuration options for
`headerBasedFilterConfig`:
+
+|Property|Type|Description|Required|Default|
+|--------|----|-----------|--------|-------|
+|`filter`|Object|A Druid filter specification that defines which records to
include based on header values. Only `in` filters are supported.|Yes||
+|`encoding`|String|The character encoding used to decode header values.
Supported encodings include `UTF-8`, `UTF-16`, `ISO-8859-1`, `US-ASCII`,
`UTF-16BE`, and `UTF-16LE`.|No|`UTF-8`|
+|`stringDecodingCacheSize`|Integer|The maximum number of decoded header
strings to cache in memory. Set to a higher value for better performance when
processing many unique header values.|No|10000|
+
+##### Header-based filtering example
+
+The following example shows how to configure header-based filtering to ingest
only records where the `environment` header has the value `production` or
`staging`:
+
+<details>
+ <summary>Click to view the example</summary>
+
+```json
+{
+ "type": "kafka",
+ "spec": {
+ "dataSchema": {
+ "dataSource": "metrics-kafka",
+ "timestampSpec": {
+ "column": "timestamp",
+ "format": "auto"
+ },
+ "dimensionsSpec": {
+ "dimensions": [],
+ "dimensionExclusions": [
+ "timestamp",
+ "value"
+ ]
+ },
+ "metricsSpec": [
+ {
+ "name": "count",
+ "type": "count"
+ },
+ {
+ "name": "value_sum",
+ "fieldName": "value",
+ "type": "doubleSum"
+ }
+ ],
+ "granularitySpec": {
+ "type": "uniform",
+ "segmentGranularity": "HOUR",
+ "queryGranularity": "NONE"
+ }
+ },
+ "ioConfig": {
+ "topic": "metrics",
+ "inputFormat": {
+ "type": "json"
+ },
+ "consumerProperties": {
+ "bootstrap.servers": "localhost:9092"
+ },
+ "headerBasedFilterConfig": {
+ "filter": {
+ "type": "in",
+ "dimension": "environment",
+ "values": ["production", "staging"]
+ },
+ "encoding": "UTF-8",
+ "stringDecodingCacheSize": 10000
+ },
+ "taskCount": 1,
+ "replicas": 1,
+ "taskDuration": "PT1H"
+ },
+ "tuningConfig": {
+ "type": "kafka",
+ "maxRowsPerSegment": 5000000
+ }
+ }
+}
+```
+</details>
+
+In this example:
+- Only records with `environment` header values of `production` or `staging`
will be ingested
Review Comment:
one more we need to state in the document is that if there's multiple values
of a header, the last header is used(based on current implementation)
##########
docs/ingestion/kafka-ingestion.md:
##########
@@ -264,6 +265,105 @@ The following example shows a supervisor spec with idle
configuration enabled:
```
</details>
+#### Header-based filtering
+
+Header-based filtering allows you to filter Kafka records based on their
headers before ingestion, reducing the amount of data processed and improving
ingestion performance. This is particularly useful when you want to ingest only
a subset of records from a Kafka topic based on header values.
+
+The following table outlines the configuration options for
`headerBasedFilterConfig`:
+
+|Property|Type|Description|Required|Default|
+|--------|----|-----------|--------|-------|
+|`filter`|Object|A Druid filter specification that defines which records to
include based on header values. Only `in` filters are supported.|Yes||
+|`encoding`|String|The character encoding used to decode header values.
Supported encodings include `UTF-8`, `UTF-16`, `ISO-8859-1`, `US-ASCII`,
`UTF-16BE`, and `UTF-16LE`.|No|`UTF-8`|
+|`stringDecodingCacheSize`|Integer|The maximum number of decoded header
strings to cache in memory. Set to a higher value for better performance when
processing many unique header values.|No|10000|
+
+##### Header-based filtering example
+
+The following example shows how to configure header-based filtering to ingest
only records where the `environment` header has the value `production` or
`staging`:
+
+<details>
+ <summary>Click to view the example</summary>
+
+```json
+{
+ "type": "kafka",
+ "spec": {
+ "dataSchema": {
+ "dataSource": "metrics-kafka",
+ "timestampSpec": {
+ "column": "timestamp",
+ "format": "auto"
+ },
+ "dimensionsSpec": {
+ "dimensions": [],
+ "dimensionExclusions": [
+ "timestamp",
+ "value"
+ ]
+ },
+ "metricsSpec": [
+ {
+ "name": "count",
+ "type": "count"
+ },
+ {
+ "name": "value_sum",
+ "fieldName": "value",
+ "type": "doubleSum"
+ }
+ ],
+ "granularitySpec": {
+ "type": "uniform",
+ "segmentGranularity": "HOUR",
+ "queryGranularity": "NONE"
+ }
+ },
+ "ioConfig": {
+ "topic": "metrics",
+ "inputFormat": {
+ "type": "json"
+ },
+ "consumerProperties": {
+ "bootstrap.servers": "localhost:9092"
+ },
+ "headerBasedFilterConfig": {
+ "filter": {
+ "type": "in",
+ "dimension": "environment",
+ "values": ["production", "staging"]
+ },
+ "encoding": "UTF-8",
+ "stringDecodingCacheSize": 10000
+ },
+ "taskCount": 1,
+ "replicas": 1,
+ "taskDuration": "PT1H"
+ },
+ "tuningConfig": {
+ "type": "kafka",
+ "maxRowsPerSegment": 5000000
+ }
+ }
+}
+```
+</details>
+
+In this example:
+- Only records with `environment` header values of `production` or `staging`
will be ingested
Review Comment:
based on the implementation below, the description here is not incomplete.
- records that do not have the environment here are also included
- records that have environment header but the header value is null are
included
- records that have environment header and value, but fail to decode value,
are also included
But from the `filter` itself semantic, I think these 3 are contradiction. My
understanding is that only records that have environment header and has either
production or staging value will be included.
correct me if I'm wrong.
##########
docs/ingestion/kafka-ingestion.md:
##########
@@ -264,6 +265,105 @@ The following example shows a supervisor spec with idle
configuration enabled:
```
</details>
+#### Header-based filtering
+
+Header-based filtering allows you to filter Kafka records based on their
headers before ingestion, reducing the amount of data processed and improving
ingestion performance. This is particularly useful when you want to ingest only
a subset of records from a Kafka topic based on header values.
+
+The following table outlines the configuration options for
`headerBasedFilterConfig`:
+
+|Property|Type|Description|Required|Default|
+|--------|----|-----------|--------|-------|
+|`filter`|Object|A Druid filter specification that defines which records to
include based on header values. Only `in` filters are supported.|Yes||
Review Comment:
```suggestion
|`filter`|Object|A Druid filter specification that defines which records to
include if the evaluation result of this filter is true based on header values.
Only `in` filter is supported.|Yes||
```
##########
docs/ingestion/kafka-ingestion.md:
##########
@@ -264,6 +265,105 @@ The following example shows a supervisor spec with idle
configuration enabled:
```
</details>
+#### Header-based filtering
+
+Header-based filtering allows you to filter Kafka records based on their
headers before ingestion, reducing the amount of data processed and improving
ingestion performance. This is particularly useful when you want to ingest only
a subset of records from a Kafka topic based on header values.
+
+The following table outlines the configuration options for
`headerBasedFilterConfig`:
+
+|Property|Type|Description|Required|Default|
+|--------|----|-----------|--------|-------|
+|`filter`|Object|A Druid filter specification that defines which records to
include based on header values. Only `in` filters are supported.|Yes||
+|`encoding`|String|The character encoding used to decode header values.
Supported encodings include `UTF-8`, `UTF-16`, `ISO-8859-1`, `US-ASCII`,
`UTF-16BE`, and `UTF-16LE`.|No|`UTF-8`|
+|`stringDecodingCacheSize`|Integer|The maximum number of decoded header
strings to cache in memory. Set to a higher value for better performance when
processing many unique header values.|No|10000|
+
+##### Header-based filtering example
+
+The following example shows how to configure header-based filtering to ingest
only records where the `environment` header has the value `production` or
`staging`:
+
+<details>
+ <summary>Click to view the example</summary>
+
+```json
+{
+ "type": "kafka",
+ "spec": {
+ "dataSchema": {
+ "dataSource": "metrics-kafka",
+ "timestampSpec": {
+ "column": "timestamp",
+ "format": "auto"
+ },
+ "dimensionsSpec": {
+ "dimensions": [],
+ "dimensionExclusions": [
+ "timestamp",
+ "value"
+ ]
+ },
+ "metricsSpec": [
+ {
+ "name": "count",
+ "type": "count"
+ },
+ {
+ "name": "value_sum",
+ "fieldName": "value",
+ "type": "doubleSum"
+ }
+ ],
+ "granularitySpec": {
+ "type": "uniform",
+ "segmentGranularity": "HOUR",
+ "queryGranularity": "NONE"
+ }
+ },
+ "ioConfig": {
+ "topic": "metrics",
+ "inputFormat": {
+ "type": "json"
+ },
+ "consumerProperties": {
+ "bootstrap.servers": "localhost:9092"
+ },
+ "headerBasedFilterConfig": {
+ "filter": {
+ "type": "in",
+ "dimension": "environment",
Review Comment:
I think the name 'dimension' here is ambiguous. 'dimension' is a concept
after a record is deserialized, but here we're defining a filter before
deserialization, itconfuses users and they may ask what's the relationship with
the dimension above defined in `dimensionSpec`.
Name such as `headerName` or `header` is better
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaHeaderBasedFilterEvaluator.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.druid.indexing.kafka.supervisor.KafkaHeaderBasedFilterConfig;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.query.filter.InDimFilter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+
+import javax.annotation.Nullable;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Evaluates Kafka header filters for pre-ingestion filtering.
+ */
+public class KafkaHeaderBasedFilterEvaluator
+{
+ private static final Logger log = new
Logger(KafkaHeaderBasedFilterEvaluator.class);
+
+ private final Filter filter;
+ private final Charset encoding;
+ private final Cache<ByteBuffer, String> stringDecodingCache;
+ private final Set<String> filterValues;
+
+ public KafkaHeaderBasedFilterEvaluator(KafkaHeaderBasedFilterConfig
headerBasedFilterConfig)
+ {
+ this.encoding = Charset.forName(headerBasedFilterConfig.getEncoding());
+ this.stringDecodingCache = Caffeine.newBuilder()
+ .maximumSize(headerBasedFilterConfig.getStringDecodingCacheSize())
+ .build();
+
+ this.filter = headerBasedFilterConfig.getFilter().toFilter();
+ if (!(filter instanceof InDimFilter)) {
+ // Only InDimFilter supported
+ throw new IllegalStateException("Unsupported filter type: " +
filter.getClass().getSimpleName());
+ }
+
+ // Convert SortedSet to HashSet for O(1) lookups instead of O(log n)
TreeSet lookups
+ InDimFilter inFilter = (InDimFilter) filter;
+ this.filterValues = new HashSet<>(inFilter.getValues());
+
+ log.info("Initialized Kafka header filter with encoding [%s] - direct
evaluation for [%s] with Caffeine string cache (max %d entries) and HashSet
lookup (%d filter values)",
+ headerBasedFilterConfig.getEncoding(),
+ this.filter.getClass().getSimpleName(),
+ headerBasedFilterConfig.getStringDecodingCacheSize(),
+ this.filterValues.size());
+ }
+
+
+ /**
+ * Evaluates whether a Kafka record should be included based on its headers.
+ *
+ * @param record the Kafka consumer record
+ * @return true if the record should be included, false if it should be
filtered out
+ */
+ public boolean shouldIncludeRecord(ConsumerRecord<byte[], byte[]> record)
+ {
+ try {
+ return evaluateInclusion(record.headers());
+ }
+ catch (Exception e) {
+ log.warn(
+ e,
+ "Error evaluating header filter for record at topic [%s] partition
[%d] offset [%d], including record",
+ record.topic(),
+ record.partition(),
+ record.offset()
+ );
+ return true; // Default to including record on error
+ }
+ }
+
+ private boolean evaluateInclusion(Headers headers)
+ {
+ InDimFilter inFilter = (InDimFilter) filter;
+
+ // Permissive behavior: missing headers result in inclusion
+ if (headers == null) {
+ return true;
+ }
+
+ Header header = headers.lastHeader(inFilter.getDimension());
+ // Permissive behavior: header is null or empty
+ if (header == null || header.value() == null) {
+ return true;
+ }
+
+ String headerValue = getDecodedHeaderValue(header.value());
+ // Permissive behavior: failed to decode header value
+ if (headerValue == null) {
+ return true;
+ }
+
+ return filterValues.contains(headerValue);
Review Comment:
I think it's a little bit hard to add a new filter here. The implementation
internally uses InDimFilter, but actually we don't use any methods or
capability provided by current filter mechansim.
I think a better way is that:
```java
interface Filter {
String getHeader();
boolean shouldInclude(String headerValue)
}
class InFilter {
public InFilter(String header, Set<String> values) {
}
public boolean shouldInclude(String headerValue) {
return values.contains(headerValue);
}
}
```
and in this `evaluateInclusion(Headers headers)` method, we just do sth like:
```java
...
Header header = headers.lastHeader(filter.getHeader());
String headerValue = getDecodedHeaderValue(header.value());
return this.filter.shouldInclude(headerValue);
```
By this way, when we want to add a new filter, we only add new
implementation of Filter without changing any other parts of the code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]