ramackri opened a new pull request, #1001:
URL: https://github.com/apache/ranger/pull/1001

   ## What changes were proposed in this pull request?
   
   This change closes Kafka producer and consumer configuration gaps in the 
audit-server stack:
   
   - **Ingestor producer** — wire `batch.size`, `linger.ms`, `buffer.memory`, 
`compression.type`, timeouts, and related `ProducerConfig` keys from site XML; 
`flush()` on shutdown; configurable REST batch send timeout.
   - **Dispatchers** — default to `CooperativeStickyAssignor`; document and 
ship consumer rebalance/poll timeout properties in Solr and HDFS site XML.
   - **Topic creation** — optionally apply `retention.ms`, `compression.type`, 
and `min.insync.replicas` when Ranger creates the `ranger_audits` topic.
   
   All new properties are **backward compatible**: code defaults apply when 
site XML omits them.
   
   ---
   
   ## End-to-end flow (Tier 3)
   
   ```text
   Ranger plugins  --REST-->  audit-ingestor (:7081)  --produce-->  Kafka 
(ranger_audits)
                                                                         |
                                         
+-------------------------------+-------------------------------+
                                         |                               |
                                         v                               v
                           ranger_audit_solr_dispatcher_group    
ranger_audit_hdfs_dispatcher_group
                                         |                               |
                                         v                               v
                                 Solr (ranger_audits)              HDFS 
(/ranger/audit/hdfs/...)
                                         |
                                         v
                           Ranger Admin → Audit → Access tab
   ```
   
   ---
   
   ## Gap analysis (before this change)
   
   ### Producer (`AuditProducer.java`)
   
   | Gap | Before | Risk |
   |-----|--------|------|
   | P1 producer props not wired | `batch.size`, `linger.ms`, `buffer.memory`, 
`compression.type`, `delivery.timeout.ms` hard-coded or Kafka defaults (32 KiB 
batch, 5 ms linger, no compression) | Lower throughput under burst; higher 
broker disk/network; no operator tuning via XML |
   | `request.timeout.ms` in XML but not on producer | Property existed in site 
XML; only used by AdminClient for topic init | Producer used Kafka default; 
inconsistent with documented config |
   | `sendBatch` timeout hard-coded | Fixed 30 s latch in code | Not adjustable 
for large batches or slow brokers |
   | No `flush()` on shutdown | `kafkaProducer.close()` only | In-flight 
records may be lost on graceful stop |
   | Topic configs at create time | `retention.ms`, `compression.type`, 
`min.insync.replicas` not applied by Ranger | Operators had to set topic 
configs out-of-band |
   
   ### Consumer (Solr / HDFS dispatchers)
   
   | Gap | Before | Risk |
   |-----|--------|------|
   | Default partition assignor | Kafka client default (`RangeAssignor`) | 
Stop-the-world rebalance on pod churn; more partition movement in K8s/Docker |
   | Consumer rebalance / poll timeouts | Partially documented; not 
consistently in site XML | `max.poll.interval.ms` exceeded when Solr batches 
are slow → consumer kicked from group |
   | `CooperativeStickyAssignor` not default | Not configured | Incremental 
cooperative rebalance unavailable despite Kafka 3.9 client |
   
   ---
   
   ## Code changes
   
   | File | Change |
   |------|--------|
   | `audit-common/.../AuditServerConstants.java` | Producer property names and 
defaults; topic config property names; default assignor → 
`CooperativeStickyAssignor` |
   | `audit-ingestor/.../AuditProducer.java` | `createProducerConfig()` wires 
all P1/P2 producer props; configurable batch send timeout; `flush()` on 
shutdown |
   | `audit-common/.../AuditMessageQueueUtils.java` | `buildTopicConfigs()` 
applies optional topic configs at create time |
   | `ranger-audit-ingestor-site.xml` | Producer tuning block + commented topic 
configs for production |
   | `ranger-audit-dispatcher-solr-site.xml` | Consumer rebalance/timeout 
properties + `CooperativeStickyAssignor` |
   | `ranger-audit-dispatcher-hdfs-site.xml` | Consumer rebalance/timeout 
properties + `CooperativeStickyAssignor` |
   | `AuditProducerTest.java` | Unit tests for `createProducerConfig()` |
   | `AuditMessageQueueUtilsTest.java` | Unit tests for `buildTopicConfigs()` |
   
   ---
   
   ## Configuration reference
   
   ### Ingestor producer
   
   **Prefix:** `ranger.audit.ingestor.kafka.producer.*`  
   **Config file:** 
`audit-ingestor/src/main/resources/conf/ranger-audit-ingestor-site.xml`
   
   | Property | Default | Kafka key | Notes |
   |----------|---------|-----------|-------|
   | `...producer.batch.size` | `131072` (128 KiB) | `batch.size` | 
65536–262144 for high volume |
   | `...producer.linger.ms` | `20` | `linger.ms` | Coalesce records into 
larger batches |
   | `...producer.buffer.memory` | `134217728` (128 MiB) | `buffer.memory` | 
Avoid `max.block.ms` backpressure |
   | `...producer.compression.type` | `lz4` | `compression.type` | ~40–50% 
savings on JSON audit payloads |
   | `...producer.delivery.timeout.ms` | `120000` | `delivery.timeout.ms` | 
Must be ≥ `request.timeout.ms` + linger |
   | `...producer.max.request.size` | `1048576` (1 MiB) | `max.request.size` | 
Must be ≤ broker `message.max.bytes` |
   | `...producer.max.block.ms` | `60000` | `max.block.ms` | Backpressure when 
buffer full |
   | `...producer.batch.send.timeout.ms` | `30000` | *(application)* | REST 
batch callback wait (not a Kafka key) |
   | `...kafka.request.timeout.ms` | `60000` | `request.timeout.ms` | Shared by 
producer and AdminClient |
   
   Producer also sets `enable.idempotence=true` and `acks=all` in code (not 
overridable via these properties).
   
   ### Optional topic configs (at create time)
   
   **Prefix:** `ranger.audit.ingestor.kafka.topic.*`  
   Applied only when set and non-blank. Uncomment in site XML for production 
clusters with RF ≥ 3.
   
   | Property | Suggested value | Kafka key |
   |----------|-----------------|-----------|
   | `...topic.retention.ms` | `604800000` (7 days) | `retention.ms` |
   | `...topic.compression.type` | `lz4` | `compression.type` |
   | `...topic.min.insync.replicas` | `2` | `min.insync.replicas` |
   
   Requires replication factor ≥ min ISR.
   
   ### Dispatcher consumers (Solr and HDFS)
   
   **Prefix:** `ranger.audit.dispatcher.*`  
   **Config files:**
   
   - 
`audit-dispatcher/dispatcher-solr/src/main/resources/conf/ranger-audit-dispatcher-solr-site.xml`
   - 
`audit-dispatcher/dispatcher-hdfs/src/main/resources/conf/ranger-audit-dispatcher-hdfs-site.xml`
   
   | Property | Default | Notes |
   |----------|---------|-------|
   | `partition.assignment.strategy` | 
`org.apache.kafka.clients.consumer.CooperativeStickyAssignor` | Incremental 
cooperative rebalance on pod churn |
   | `session.timeout.ms` | `60000` | Must be > 3 × `heartbeat.interval.ms` |
   | `heartbeat.interval.ms` | `10000` | Consumer liveness while idle |
   | `max.poll.interval.ms` | `300000` (5 min) | Increase if Solr indexing or 
HDFS writes exceed this window |
   | `max.poll.records` | `500` | Reduce to ~200 if `max.poll.interval.ms` is 
exceeded |
   
   ---
   
   ## Tuning guide
   
   ### High audit volume (producer)
   
   | Goal | Action |
   |------|--------|
   | Higher throughput | Increase `batch.size` (up to 262144) and `linger.ms` 
(10–50) |
   | Lower broker disk/network | Keep `compression.type=lz4` |
   | Avoid producer blocking | Increase `buffer.memory` (64–128 MiB) |
   | Slow broker / large batches | Raise `delivery.timeout.ms` and 
`batch.send.timeout.ms` together |
   
   ### Slow Solr or HDFS batches (consumer)
   
   | Symptom | Action |
   |---------|--------|
   | Consumer leaves group (`max.poll.interval.ms` exceeded) | Increase 
`max.poll.interval.ms` to 1.5–2× p99 batch processing time |
   | Frequent rebalance on deploy | Keep `CooperativeStickyAssignor`; expect 
one rebalance on first upgrade from `RangeAssignor` |
   | Long GC pauses | Reduce `max.poll.records` |
   
   ### Production Kafka cluster (RF = 3)
   
   Uncomment topic properties in `ranger-audit-ingestor-site.xml`:
   
   ```xml
   <property>
     <name>ranger.audit.ingestor.kafka.topic.retention.ms</name>
     <value>604800000</value>
   </property>
   <property>
     <name>ranger.audit.ingestor.kafka.topic.compression.type</name>
     <value>lz4</value>
   </property>
   <property>
     <name>ranger.audit.ingestor.kafka.topic.min.insync.replicas</name>
     <value>2</value>
   </property>
   ```
   
   ---
   
   ## Impact and rollback
   
   | Component | Impact |
   |-----------|--------|
   | **Audit ingestor** | Higher throughput under burst; lz4 compression; 
graceful shutdown flush. Defaults apply when properties omitted. |
   | **Solr / HDFS dispatchers** | Cooperative sticky rebalance; fewer 
disruptive revokes on container restart. One rebalance possible on first 
upgrade. |
   | **Topic creation** | Optional retention/compression/min ISR applied only 
when properties set. |
   | **Rollback** | Restore previous site XML and redeploy audit pods; no 
schema migration. |
   
   ---
   
   
   
   
   ## How was this patch tested?
   
   ## Testing
   
   ### Unit tests
   
   ```bash
   mvn test -pl audit-server/audit-common,audit-server/audit-ingestor \
     -Dcheckstyle.skip=true -Dpmd.skip=true -Drat.skip=true
   ```
   
   | Test class | Coverage |
   |------------|----------|
   | `AuditProducerTest` | `createProducerConfig()` — defaults, overrides, 
idempotent/acks wiring |
   | `AuditMessageQueueUtilsTest` | `buildTopicConfigs()` — retention, 
compression, min ISR |
   
   Compile (without full distro):
   
   ```bash
   mvn compile -pl 
audit-server/audit-common,audit-server/audit-ingestor,audit-server/audit-dispatcher/dispatcher-common
 \
     -DskipTests -Drat.skip=true
   ```
   
   ### Manual — Docker Tier 3 pipeline
   
   **Environment:** Docker Compose, Kerberos, Tier 3 stack (Admin, Postgres, 
Kafka, ZooKeeper, Solr, Hadoop/HDFS plugin, audit ingestor, Solr dispatcher, 
HDFS dispatcher).
   
   | Step | Expected result |
   |------|-----------------|
   | Ingestor health (`:7081`) | UP |
   | Solr dispatcher health (`:7091`) | UP |
   | HDFS dispatcher health (`:7092`) | UP |
   | HDFS access as Kerberos user | Solr `numFound` increases for that user |
   | Ranger Admin → Audit → Access | New event visible; matches Solr document |
   | HDFS audit path | Files under `/ranger/audit/hdfs/YYYYMMDD/` |
   | Kafka broker restart | Pipeline blocks during outage; recovers after 
broker is healthy |
   
   ## Build and deploy notes
   
   Tarballs are produced by the audit-server assembly modules. A full `mvn 
package -pl distro` may fail on unrelated modules (for example Kylin); build 
audit-server modules directly:
   
   ```bash
   mvn package -pl 
audit-server/audit-ingestor,audit-server/audit-dispatcher/dispatcher-solr,audit-server/audit-dispatcher/dispatcher-hdfs
 \
     -am -DskipTests -Drat.skip=true
   ```
   
   After changing site XML, redeploy the affected audit pod (ingestor and/or 
dispatchers) for properties to take effect. Producer and consumer client 
properties are read at JVM startup.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to