FrankChen021 commented on code in PR #18525:
URL: https://github.com/apache/druid/pull/18525#discussion_r2460937336


##########
docs/ingestion/kafka-ingestion.md:
##########
@@ -264,6 +265,105 @@ The following example shows a supervisor spec with idle 
configuration enabled:
 ```
 </details>
 
+#### Header-based filtering
+
+Header-based filtering allows you to filter Kafka records based on their 
headers before ingestion, reducing the amount of data processed and improving 
ingestion performance. This is particularly useful when you want to ingest only 
a subset of records from a Kafka topic based on header values.
+
+The following table outlines the configuration options for 
`headerBasedFilterConfig`:
+
+|Property|Type|Description|Required|Default|
+|--------|----|-----------|--------|-------|
+|`filter`|Object|A Druid filter specification that defines which records to 
include based on header values. Only `in` filters are supported.|Yes||
+|`encoding`|String|The character encoding used to decode header values. 
Supported encodings include `UTF-8`, `UTF-16`, `ISO-8859-1`, `US-ASCII`, 
`UTF-16BE`, and `UTF-16LE`.|No|`UTF-8`|
+|`stringDecodingCacheSize`|Integer|The maximum number of decoded header 
strings to cache in memory. Set to a higher value for better performance when 
processing many unique header values.|No|10000|
+
+##### Header-based filtering example
+
+The following example shows how to configure header-based filtering to ingest 
only records where the `environment` header has the value `production` or 
`staging`:
+
+<details>
+  <summary>Click to view the example</summary>
+
+```json
+{
+  "type": "kafka",
+  "spec": {
+    "dataSchema": {
+      "dataSource": "metrics-kafka",
+      "timestampSpec": {
+        "column": "timestamp",
+        "format": "auto"
+      },
+      "dimensionsSpec": {
+        "dimensions": [],
+        "dimensionExclusions": [
+          "timestamp",
+          "value"
+        ]
+      },
+      "metricsSpec": [
+        {
+          "name": "count",
+          "type": "count"
+        },
+        {
+          "name": "value_sum",
+          "fieldName": "value",
+          "type": "doubleSum"
+        }
+      ],
+      "granularitySpec": {
+        "type": "uniform",
+        "segmentGranularity": "HOUR",
+        "queryGranularity": "NONE"
+      }
+    },
+    "ioConfig": {
+      "topic": "metrics",
+      "inputFormat": {
+        "type": "json"
+      },
+      "consumerProperties": {
+        "bootstrap.servers": "localhost:9092"
+      },
+      "headerBasedFilterConfig": {
+        "filter": {
+          "type": "in",
+          "dimension": "environment",
+          "values": ["production", "staging"]
+        },
+        "encoding": "UTF-8",
+        "stringDecodingCacheSize": 10000
+      },
+      "taskCount": 1,
+      "replicas": 1,
+      "taskDuration": "PT1H"
+    },
+    "tuningConfig": {
+      "type": "kafka",
+      "maxRowsPerSegment": 5000000
+    }
+  }
+}
+```
+</details>
+
+In this example:
+- Only records with `environment` header values of `production` or `staging` 
will be ingested

Review Comment:
   one more we need to state in the document is that if there's multiple values 
of a header, the last header is used(based on current implementation)



##########
docs/ingestion/kafka-ingestion.md:
##########
@@ -264,6 +265,105 @@ The following example shows a supervisor spec with idle 
configuration enabled:
 ```
 </details>
 
+#### Header-based filtering
+
+Header-based filtering allows you to filter Kafka records based on their 
headers before ingestion, reducing the amount of data processed and improving 
ingestion performance. This is particularly useful when you want to ingest only 
a subset of records from a Kafka topic based on header values.
+
+The following table outlines the configuration options for 
`headerBasedFilterConfig`:
+
+|Property|Type|Description|Required|Default|
+|--------|----|-----------|--------|-------|
+|`filter`|Object|A Druid filter specification that defines which records to 
include based on header values. Only `in` filters are supported.|Yes||
+|`encoding`|String|The character encoding used to decode header values. 
Supported encodings include `UTF-8`, `UTF-16`, `ISO-8859-1`, `US-ASCII`, 
`UTF-16BE`, and `UTF-16LE`.|No|`UTF-8`|
+|`stringDecodingCacheSize`|Integer|The maximum number of decoded header 
strings to cache in memory. Set to a higher value for better performance when 
processing many unique header values.|No|10000|
+
+##### Header-based filtering example
+
+The following example shows how to configure header-based filtering to ingest 
only records where the `environment` header has the value `production` or 
`staging`:
+
+<details>
+  <summary>Click to view the example</summary>
+
+```json
+{
+  "type": "kafka",
+  "spec": {
+    "dataSchema": {
+      "dataSource": "metrics-kafka",
+      "timestampSpec": {
+        "column": "timestamp",
+        "format": "auto"
+      },
+      "dimensionsSpec": {
+        "dimensions": [],
+        "dimensionExclusions": [
+          "timestamp",
+          "value"
+        ]
+      },
+      "metricsSpec": [
+        {
+          "name": "count",
+          "type": "count"
+        },
+        {
+          "name": "value_sum",
+          "fieldName": "value",
+          "type": "doubleSum"
+        }
+      ],
+      "granularitySpec": {
+        "type": "uniform",
+        "segmentGranularity": "HOUR",
+        "queryGranularity": "NONE"
+      }
+    },
+    "ioConfig": {
+      "topic": "metrics",
+      "inputFormat": {
+        "type": "json"
+      },
+      "consumerProperties": {
+        "bootstrap.servers": "localhost:9092"
+      },
+      "headerBasedFilterConfig": {
+        "filter": {
+          "type": "in",
+          "dimension": "environment",
+          "values": ["production", "staging"]
+        },
+        "encoding": "UTF-8",
+        "stringDecodingCacheSize": 10000
+      },
+      "taskCount": 1,
+      "replicas": 1,
+      "taskDuration": "PT1H"
+    },
+    "tuningConfig": {
+      "type": "kafka",
+      "maxRowsPerSegment": 5000000
+    }
+  }
+}
+```
+</details>
+
+In this example:
+- Only records with `environment` header values of `production` or `staging` 
will be ingested

Review Comment:
   based on the implementation below, the description here is not incomplete.
   - records that do not have the environment here are also included
   - records that have environment header but the header value is null are 
included
   - records that have environment header and value, but fail to decode value, 
are also included
   
   But from the `filter` itself semantic, I think these 3 are contradiction. My 
understanding is that only records that have environment header and has either 
production or staging value will be included.
   
   correct me if I'm wrong.



##########
docs/ingestion/kafka-ingestion.md:
##########
@@ -264,6 +265,105 @@ The following example shows a supervisor spec with idle 
configuration enabled:
 ```
 </details>
 
+#### Header-based filtering
+
+Header-based filtering allows you to filter Kafka records based on their 
headers before ingestion, reducing the amount of data processed and improving 
ingestion performance. This is particularly useful when you want to ingest only 
a subset of records from a Kafka topic based on header values.
+
+The following table outlines the configuration options for 
`headerBasedFilterConfig`:
+
+|Property|Type|Description|Required|Default|
+|--------|----|-----------|--------|-------|
+|`filter`|Object|A Druid filter specification that defines which records to 
include based on header values. Only `in` filters are supported.|Yes||

Review Comment:
   ```suggestion
   |`filter`|Object|A Druid filter specification that defines which records to 
include if the evaluation result of this filter is true based on header values. 
Only `in` filter is supported.|Yes||
   ```



##########
docs/ingestion/kafka-ingestion.md:
##########
@@ -264,6 +265,105 @@ The following example shows a supervisor spec with idle 
configuration enabled:
 ```
 </details>
 
+#### Header-based filtering
+
+Header-based filtering allows you to filter Kafka records based on their 
headers before ingestion, reducing the amount of data processed and improving 
ingestion performance. This is particularly useful when you want to ingest only 
a subset of records from a Kafka topic based on header values.
+
+The following table outlines the configuration options for 
`headerBasedFilterConfig`:
+
+|Property|Type|Description|Required|Default|
+|--------|----|-----------|--------|-------|
+|`filter`|Object|A Druid filter specification that defines which records to 
include based on header values. Only `in` filters are supported.|Yes||
+|`encoding`|String|The character encoding used to decode header values. 
Supported encodings include `UTF-8`, `UTF-16`, `ISO-8859-1`, `US-ASCII`, 
`UTF-16BE`, and `UTF-16LE`.|No|`UTF-8`|
+|`stringDecodingCacheSize`|Integer|The maximum number of decoded header 
strings to cache in memory. Set to a higher value for better performance when 
processing many unique header values.|No|10000|
+
+##### Header-based filtering example
+
+The following example shows how to configure header-based filtering to ingest 
only records where the `environment` header has the value `production` or 
`staging`:
+
+<details>
+  <summary>Click to view the example</summary>
+
+```json
+{
+  "type": "kafka",
+  "spec": {
+    "dataSchema": {
+      "dataSource": "metrics-kafka",
+      "timestampSpec": {
+        "column": "timestamp",
+        "format": "auto"
+      },
+      "dimensionsSpec": {
+        "dimensions": [],
+        "dimensionExclusions": [
+          "timestamp",
+          "value"
+        ]
+      },
+      "metricsSpec": [
+        {
+          "name": "count",
+          "type": "count"
+        },
+        {
+          "name": "value_sum",
+          "fieldName": "value",
+          "type": "doubleSum"
+        }
+      ],
+      "granularitySpec": {
+        "type": "uniform",
+        "segmentGranularity": "HOUR",
+        "queryGranularity": "NONE"
+      }
+    },
+    "ioConfig": {
+      "topic": "metrics",
+      "inputFormat": {
+        "type": "json"
+      },
+      "consumerProperties": {
+        "bootstrap.servers": "localhost:9092"
+      },
+      "headerBasedFilterConfig": {
+        "filter": {
+          "type": "in",
+          "dimension": "environment",

Review Comment:
   I think the name 'dimension' here is ambiguous. 'dimension' is a concept 
after a record is deserialized, but here we're defining a filter before 
deserialization, itconfuses users and they may ask what's the relationship with 
the dimension above defined in `dimensionSpec`.
   
   Name such as `headerName` or `header` is better



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaHeaderBasedFilterEvaluator.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.druid.indexing.kafka.supervisor.KafkaHeaderBasedFilterConfig;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.query.filter.InDimFilter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+
+import javax.annotation.Nullable;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Evaluates Kafka header filters for pre-ingestion filtering.
+ */
+public class KafkaHeaderBasedFilterEvaluator
+{
+  private static final Logger log = new 
Logger(KafkaHeaderBasedFilterEvaluator.class);
+
+  private final Filter filter;
+  private final Charset encoding;
+  private final Cache<ByteBuffer, String> stringDecodingCache;
+  private final Set<String> filterValues;
+
+  public KafkaHeaderBasedFilterEvaluator(KafkaHeaderBasedFilterConfig 
headerBasedFilterConfig)
+  {
+    this.encoding = Charset.forName(headerBasedFilterConfig.getEncoding());
+    this.stringDecodingCache = Caffeine.newBuilder()
+        .maximumSize(headerBasedFilterConfig.getStringDecodingCacheSize())
+        .build();
+
+    this.filter = headerBasedFilterConfig.getFilter().toFilter();
+    if (!(filter instanceof InDimFilter)) {
+      // Only InDimFilter supported
+      throw new IllegalStateException("Unsupported filter type: " + 
filter.getClass().getSimpleName());
+    }
+
+    // Convert SortedSet to HashSet for O(1) lookups instead of O(log n) 
TreeSet lookups
+    InDimFilter inFilter = (InDimFilter) filter;
+    this.filterValues = new HashSet<>(inFilter.getValues());
+
+    log.info("Initialized Kafka header filter with encoding [%s] - direct 
evaluation for [%s] with Caffeine string cache (max %d entries) and HashSet 
lookup (%d filter values)",
+            headerBasedFilterConfig.getEncoding(),
+            this.filter.getClass().getSimpleName(),
+            headerBasedFilterConfig.getStringDecodingCacheSize(),
+            this.filterValues.size());
+  }
+
+
+  /**
+   * Evaluates whether a Kafka record should be included based on its headers.
+   *
+   * @param record the Kafka consumer record
+   * @return true if the record should be included, false if it should be 
filtered out
+   */
+  public boolean shouldIncludeRecord(ConsumerRecord<byte[], byte[]> record)
+  {
+    try {
+      return evaluateInclusion(record.headers());
+    }
+    catch (Exception e) {
+      log.warn(
+          e,
+          "Error evaluating header filter for record at topic [%s] partition 
[%d] offset [%d], including record",
+          record.topic(),
+          record.partition(),
+          record.offset()
+      );
+      return true; // Default to including record on error
+    }
+  }
+
+  private boolean evaluateInclusion(Headers headers)
+  {
+    InDimFilter inFilter = (InDimFilter) filter;
+
+    // Permissive behavior: missing headers result in inclusion
+    if (headers == null) {
+      return true;
+    }
+
+    Header header = headers.lastHeader(inFilter.getDimension());
+    // Permissive behavior: header is null or empty
+    if (header == null || header.value() == null) {
+      return true;
+    }
+
+    String headerValue = getDecodedHeaderValue(header.value());
+    // Permissive behavior: failed to decode header value
+    if (headerValue == null) {
+      return true;
+    }
+
+    return filterValues.contains(headerValue);

Review Comment:
   I think it's a little bit hard to add a new filter here. The implementation 
internally uses InDimFilter, but actually we don't use any methods or 
capability provided by current filter mechansim.
   
   I think a better way is that:
   
   
   ```java
   interface Filter {
      String getHeader();
       boolean shouldInclude(String headerValue)
   }
   
   class InFilter {
       public InFilter(String header, Set<String> values) {
       }
   
       public boolean shouldInclude(String headerValue) {
            return values.contains(headerValue);
       }
   }
   ```
   
   and in this `evaluateInclusion(Headers headers)` method, we just do sth like:
   ```java
   
   ...
   Header header = headers.lastHeader(filter.getHeader());
   
   String headerValue = getDecodedHeaderValue(header.value());
   
   return this.filter.shouldInclude(headerValue);
   ```
   
   By this way, when we want to add a new filter, we only add new 
implementation of Filter without changing any other parts of the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to