[ 
https://issues.apache.org/jira/browse/RANGER-5655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ramachandran Krishnan updated RANGER-5655:
------------------------------------------
    Description: 
Implement a *dynamic unified ingestor registry* for Ranger audit-ingestor so 
operators can change Kafka partition routing and per-repo service allowlists at 
runtime — without restarting ingestor pods.

The registry is stored in a Kafka compacted topic 
({{{}ranger_audit_partition_plan{}}}) and managed via REST 
({{{}/api/audit/partition-plan{}}}). All ingestor replicas converge on the same 
versioned plan through a background watcher; {{AuditPartitioner}} routes audit 
records on the hot path from in-memory state only.

Feature flag (default off): 
{{ranger.audit.ingestor.kafka.partition.plan.dynamic.enabled=false}}

 
----
h2. *Problem*

Today (static mode), audit-ingestor loads two kinds of configuration from XML 
at startup only:
||Job||Question||Static behavior||
|Service allowlist|May this Kerberos principal POST audits for repo 
{*}R{*}?|{{ranger.audit.ingestor.service.<repo>.allowed.users}} in site XML|
|Partition routing|After accept, which {{ranger_audits}} 
partition?|{{kafka.configured.plugins}} + per-plugin overrides in site XML|

Changing either requires editing XML and {*}restarting every ingestor 
replica{*}. Contiguous-range static allocation can also reshuffle later plugins 
when an early plugin's partition count changes.

*Goals:*
 * Onboard new plugins/repos and scale hot plugins without ingestor restart
 * Append-only partition growth (no reshuffle of existing plugin assignments)
 * One shared source of truth across all ingestor pods
 * No new infra (no Postgres / ZooKeeper for the registry)

----
h2. Solution

Introduce a *unified partition plan document* (versioned JSON) in Kafka topic 
{{ranger_audit_partition_plan}} (1 partition, compacted). One document holds:
 * {{plugins}} — dedicated partition IDs per plugin id (Kafka record key / 
agent id)
 * {{buffer}} — partition pool for not-yet-promoted plugins (sticky hash)
 * {{services}} — per-repo {{allowedUsers}} for {{POST /api/audit/access}}
 * {{topicPartitionCount}} — must match live {{ranger_audits}} partition count
 * {{version}} — optimistic locking for REST mutations

*Control plane:* REST + registry topic + {{PartitionPlanWatcher}}
*Data plane:* plugins POST {{/api/audit/access}} → allowlist check → 
{{AuditPartitioner}} → {{ranger_audits}}

Solr/HDFS dispatchers are unchanged; they consume all partitions of 
{{{}ranger_audits{}}}.
----
h2. Key deliverables
 * *Kafka registry* — {{{}KafkaPartitionPlanRegistry{}}}, bootstrap from XML on 
greenfield, brownfield pre-seed support
 * *REST API* — {{{}GET/PATCH /partition-plan{}}}, {{{}POST 
/partition-plan/plugins{}}}, {{{}POST /partition-plan/services{}}}, {{{}PATCH 
/partition-plan/plugins/{pluginId{}}}} with {{expectedVersion}} (409 on stale)
 * *Dynamic partitioner* — {{AuditPartitioner}} reads 
{{{}PartitionPlanHolder{}}}; round-robin for promoted plugins; buffer sticky 
hash for unknown plugins; post-scale routing uses {{max(cluster, plan)}} when 
metadata lags
 * *Unified allowlist* — {{services}} map in same registry; {{auth_to_local}} 
rules recomposed from allowlists in dynamic mode
 * *Topic grow* — grow {{ranger_audits}} before registry write when 
promoting/scaling
 * *E2E harness* — Docker Tier 3 scripts for partition-plan REST, plugin 
onboard/routing, auth_to_local, full plugin→Solr pipelines (HDFS, Ozone, Hive, 
etc.)
 * *Documentation* — implementation guide, ops runbook, brownfield migration, 
E2E test plan

----
h2. Configuration
{code:xml}
<property>
  <name>ranger.audit.ingestor.kafka.partition.plan.dynamic.enabled</name>
  <value>true</value>
</property>
<property>
  <name>ranger.audit.ingestor.kafka.partition.plan.topic</name>
  <value>ranger_audit_partition_plan</value>
</property>
{code}
When dynamic mode is on and registry is empty, the first ingestor pod 
bootstraps an initial plan from existing XML properties 
({{{}kafka.configured.plugins{}}}, buffer/per-plugin counts, service 
allowlists).
----
h2. Example plan JSON
{code:json}
{
  "topic": "ranger_audits",
  "version": 12,
  "topicPartitionCount": 48,
  "plugins": {
    "hdfs":        { "partitions": [0, 1, 2, 3, 4, 5] },
    "hiveServer2": { "partitions": [6, 7, 8, 9, 10, 11] }
  },
  "buffer": { "partitions": [12, 13, "..."] },
  "services": {
    "dev_hive":  { "allowedUsers": ["hive"] },
    "dev_ozone": { "allowedUsers": ["om", "ozone"] },
    "dev_hdfs":  { "allowedUsers": ["hdfs", "nn"] }
  }
}
{code}
----
h2. Testing

Validated scenarios include:

static vs dynamic mode,

greenfield bootstrap,

REST promote/scale (200/409/400),

multi-pod convergence,

brownfield pre-seed,

plugin onboard + Kafka partition routing,

auth_to_local recomposition,

HDFS/Ozone/Hive full audit pipelines.
----
h2. Acceptance criteria
 # With {{{}dynamic.enabled=false{}}}, behavior matches existing static XML 
partitioning; {{GET /partition-plan}} returns 503
 # With {{{}dynamic.enabled=true{}}}, registry topic created (1 partition, 
compacted); bootstrap plan published on greenfield
 # REST promote/scale updates plan version; stale {{expectedVersion}} returns 
409
 # All ingestor replicas converge to same plan within watcher refresh interval
 # Audits for promoted plugin land only on assigned {{ranger_audits}} partitions
 # {{POST /partition-plan/services}} updates allowlist; unauthorized principal 
returns 403
 # Post-scale routing works when Kafka metadata lags plan 
({{{}AuditPartitioner{}}} bound logic)

  was:
Implement a *dynamic unified ingestor registry* for Ranger audit-ingestor so 
operators can change Kafka partition routing and per-repo service allowlists at 
runtime — without restarting ingestor pods.

The registry is stored in a Kafka compacted topic 
({{{}ranger_audit_partition_plan{}}}) and managed via REST 
({{{}/api/audit/partition-plan{}}}). All ingestor replicas converge on the same 
versioned plan through a background watcher; {{AuditPartitioner}} routes audit 
records on the hot path from in-memory state only.

Feature flag (default off): 
{{ranger.audit.ingestor.kafka.partition.plan.dynamic.enabled=false}}

 
----
h2. *Problem*

Today (static mode), audit-ingestor loads two kinds of configuration from XML 
at startup only:
||Job||Question||Static behavior||
|Service allowlist|May this Kerberos principal POST audits for repo 
{*}R{*}?|{{ranger.audit.ingestor.service.<repo>.allowed.users}} in site XML|
|Partition routing|After accept, which {{ranger_audits}} 
partition?|{{kafka.configured.plugins}} + per-plugin overrides in site XML|

Changing either requires editing XML and {*}restarting every ingestor 
replica{*}. Contiguous-range static allocation can also reshuffle later plugins 
when an early plugin's partition count changes.

*Goals:*
 * Onboard new plugins/repos and scale hot plugins without ingestor restart
 * Append-only partition growth (no reshuffle of existing plugin assignments)
 * One shared source of truth across all ingestor pods
 * No new infra (no Postgres / ZooKeeper for the registry)

----
h2. Solution

Introduce a *unified partition plan document* (versioned JSON) in Kafka topic 
{{ranger_audit_partition_plan}} (1 partition, compacted). One document holds:
 * {{plugins}} — dedicated partition IDs per plugin id (Kafka record key / 
agent id)
 * {{buffer}} — partition pool for not-yet-promoted plugins (sticky hash)
 * {{services}} — per-repo {{allowedUsers}} for {{POST /api/audit/access}}
 * {{topicPartitionCount}} — must match live {{ranger_audits}} partition count
 * {{version}} — optimistic locking for REST mutations

*Control plane:* REST + registry topic + {{PartitionPlanWatcher}}
*Data plane:* plugins POST {{/api/audit/access}} → allowlist check → 
{{AuditPartitioner}} → {{ranger_audits}}

Solr/HDFS dispatchers are unchanged; they consume all partitions of 
{{{}ranger_audits{}}}.
----
h2. Key deliverables
 * *Kafka registry* — {{{}KafkaPartitionPlanRegistry{}}}, bootstrap from XML on 
greenfield, brownfield pre-seed support
 * *REST API* — {{{}GET/PATCH /partition-plan{}}}, {{{}POST 
/partition-plan/plugins{}}}, {{{}POST /partition-plan/services{}}}, {{{}PATCH 
/partition-plan/plugins/{pluginId{}}}} with {{expectedVersion}} (409 on stale)
 * *Dynamic partitioner* — {{AuditPartitioner}} reads 
{{{}PartitionPlanHolder{}}}; round-robin for promoted plugins; buffer sticky 
hash for unknown plugins; post-scale routing uses {{max(cluster, plan)}} when 
metadata lags
 * *Unified allowlist* — {{services}} map in same registry; {{auth_to_local}} 
rules recomposed from allowlists in dynamic mode
 * *Topic grow* — grow {{ranger_audits}} before registry write when 
promoting/scaling
 * *E2E harness* — Docker Tier 3 scripts for partition-plan REST, plugin 
onboard/routing, auth_to_local, full plugin→Solr pipelines (HDFS, Ozone, Hive, 
etc.)
 * *Documentation* — implementation guide, ops runbook, brownfield migration, 
E2E test plan

----
h2. Configuration
{code:xml}
<property>
  <name>ranger.audit.ingestor.kafka.partition.plan.dynamic.enabled</name>
  <value>true</value>
</property>
<property>
  <name>ranger.audit.ingestor.kafka.partition.plan.topic</name>
  <value>ranger_audit_partition_plan</value>
</property>
{code}
When dynamic mode is on and registry is empty, the first ingestor pod 
bootstraps an initial plan from existing XML properties 
({{{}kafka.configured.plugins{}}}, buffer/per-plugin counts, service 
allowlists).
----
h2. Example plan JSON
{code:json}
{
  "topic": "ranger_audits",
  "version": 12,
  "topicPartitionCount": 48,
  "plugins": {
    "hdfs":        { "partitions": [0, 1, 2, 3, 4, 5] },
    "hiveServer2": { "partitions": [6, 7, 8, 9, 10, 11] }
  },
  "buffer": { "partitions": [12, 13, "..."] },
  "services": {
    "dev_hive":  { "allowedUsers": ["hive"] },
    "dev_ozone": { "allowedUsers": ["om", "ozone"] },
    "dev_hdfs":  { "allowedUsers": ["hdfs", "nn"] }
  }
}
{code}
----
h2. Testing

*Unit tests:*
{code:bash}
mvn verify -pl audit-server/audit-common,audit-server/audit-ingestor 
-Drat.skip=true
{code}
Key classes: {{{}AuditPartitionerDynamicTest{}}}, 
{{{}PartitionPlanAllocatorTest{}}}, {{{}PartitionPlanValidatorTest{}}}, 
{{{}PartitionPlanServiceMutationTest{}}}, {{PartitionPlanBootstrapTest}}

Validated scenarios include: static vs dynamic mode, greenfield bootstrap, REST 
promote/scale (200/409/400), multi-pod convergence, brownfield pre-seed, plugin 
onboard + Kafka partition routing, auth_to_local recomposition, HDFS/Ozone/Hive 
full audit pipelines.
----
h2. Acceptance criteria
 # With {{{}dynamic.enabled=false{}}}, behavior matches existing static XML 
partitioning; {{GET /partition-plan}} returns 503
 # With {{{}dynamic.enabled=true{}}}, registry topic created (1 partition, 
compacted); bootstrap plan published on greenfield
 # REST promote/scale updates plan version; stale {{expectedVersion}} returns 
409
 # All ingestor replicas converge to same plan within watcher refresh interval
 # Audits for promoted plugin land only on assigned {{ranger_audits}} partitions
 # {{POST /partition-plan/services}} updates allowlist; unauthorized principal 
returns 403
 # Post-scale routing works when Kafka metadata lags plan 
({{{}AuditPartitioner{}}} bound logic)


> Implement dynamic unified ingestor registry for audit-ingestor: runtime Kafka 
> partition routing and per-repo service allowlists via compacted topic + REST, 
> without ingestor restarts. Feature flag default off.
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: RANGER-5655
>                 URL: https://issues.apache.org/jira/browse/RANGER-5655
>             Project: Ranger
>          Issue Type: Task
>          Components: Ranger
>            Reporter: Ramachandran Krishnan
>            Assignee: Ramachandran Krishnan
>            Priority: Major
>             Fix For: 3.0.0
>
>
> Implement a *dynamic unified ingestor registry* for Ranger audit-ingestor so 
> operators can change Kafka partition routing and per-repo service allowlists 
> at runtime — without restarting ingestor pods.
> The registry is stored in a Kafka compacted topic 
> ({{{}ranger_audit_partition_plan{}}}) and managed via REST 
> ({{{}/api/audit/partition-plan{}}}). All ingestor replicas converge on the 
> same versioned plan through a background watcher; {{AuditPartitioner}} routes 
> audit records on the hot path from in-memory state only.
> Feature flag (default off): 
> {{ranger.audit.ingestor.kafka.partition.plan.dynamic.enabled=false}}
>  
> ----
> h2. *Problem*
> Today (static mode), audit-ingestor loads two kinds of configuration from XML 
> at startup only:
> ||Job||Question||Static behavior||
> |Service allowlist|May this Kerberos principal POST audits for repo 
> {*}R{*}?|{{ranger.audit.ingestor.service.<repo>.allowed.users}} in site XML|
> |Partition routing|After accept, which {{ranger_audits}} 
> partition?|{{kafka.configured.plugins}} + per-plugin overrides in site XML|
> Changing either requires editing XML and {*}restarting every ingestor 
> replica{*}. Contiguous-range static allocation can also reshuffle later 
> plugins when an early plugin's partition count changes.
> *Goals:*
>  * Onboard new plugins/repos and scale hot plugins without ingestor restart
>  * Append-only partition growth (no reshuffle of existing plugin assignments)
>  * One shared source of truth across all ingestor pods
>  * No new infra (no Postgres / ZooKeeper for the registry)
> ----
> h2. Solution
> Introduce a *unified partition plan document* (versioned JSON) in Kafka topic 
> {{ranger_audit_partition_plan}} (1 partition, compacted). One document holds:
>  * {{plugins}} — dedicated partition IDs per plugin id (Kafka record key / 
> agent id)
>  * {{buffer}} — partition pool for not-yet-promoted plugins (sticky hash)
>  * {{services}} — per-repo {{allowedUsers}} for {{POST /api/audit/access}}
>  * {{topicPartitionCount}} — must match live {{ranger_audits}} partition count
>  * {{version}} — optimistic locking for REST mutations
> *Control plane:* REST + registry topic + {{PartitionPlanWatcher}}
> *Data plane:* plugins POST {{/api/audit/access}} → allowlist check → 
> {{AuditPartitioner}} → {{ranger_audits}}
> Solr/HDFS dispatchers are unchanged; they consume all partitions of 
> {{{}ranger_audits{}}}.
> ----
> h2. Key deliverables
>  * *Kafka registry* — {{{}KafkaPartitionPlanRegistry{}}}, bootstrap from XML 
> on greenfield, brownfield pre-seed support
>  * *REST API* — {{{}GET/PATCH /partition-plan{}}}, {{{}POST 
> /partition-plan/plugins{}}}, {{{}POST /partition-plan/services{}}}, {{{}PATCH 
> /partition-plan/plugins/{pluginId{}}}} with {{expectedVersion}} (409 on stale)
>  * *Dynamic partitioner* — {{AuditPartitioner}} reads 
> {{{}PartitionPlanHolder{}}}; round-robin for promoted plugins; buffer sticky 
> hash for unknown plugins; post-scale routing uses {{max(cluster, plan)}} when 
> metadata lags
>  * *Unified allowlist* — {{services}} map in same registry; {{auth_to_local}} 
> rules recomposed from allowlists in dynamic mode
>  * *Topic grow* — grow {{ranger_audits}} before registry write when 
> promoting/scaling
>  * *E2E harness* — Docker Tier 3 scripts for partition-plan REST, plugin 
> onboard/routing, auth_to_local, full plugin→Solr pipelines (HDFS, Ozone, 
> Hive, etc.)
>  * *Documentation* — implementation guide, ops runbook, brownfield migration, 
> E2E test plan
> ----
> h2. Configuration
> {code:xml}
> <property>
>   <name>ranger.audit.ingestor.kafka.partition.plan.dynamic.enabled</name>
>   <value>true</value>
> </property>
> <property>
>   <name>ranger.audit.ingestor.kafka.partition.plan.topic</name>
>   <value>ranger_audit_partition_plan</value>
> </property>
> {code}
> When dynamic mode is on and registry is empty, the first ingestor pod 
> bootstraps an initial plan from existing XML properties 
> ({{{}kafka.configured.plugins{}}}, buffer/per-plugin counts, service 
> allowlists).
> ----
> h2. Example plan JSON
> {code:json}
> {
>   "topic": "ranger_audits",
>   "version": 12,
>   "topicPartitionCount": 48,
>   "plugins": {
>     "hdfs":        { "partitions": [0, 1, 2, 3, 4, 5] },
>     "hiveServer2": { "partitions": [6, 7, 8, 9, 10, 11] }
>   },
>   "buffer": { "partitions": [12, 13, "..."] },
>   "services": {
>     "dev_hive":  { "allowedUsers": ["hive"] },
>     "dev_ozone": { "allowedUsers": ["om", "ozone"] },
>     "dev_hdfs":  { "allowedUsers": ["hdfs", "nn"] }
>   }
> }
> {code}
> ----
> h2. Testing
> Validated scenarios include:
> static vs dynamic mode,
> greenfield bootstrap,
> REST promote/scale (200/409/400),
> multi-pod convergence,
> brownfield pre-seed,
> plugin onboard + Kafka partition routing,
> auth_to_local recomposition,
> HDFS/Ozone/Hive full audit pipelines.
> ----
> h2. Acceptance criteria
>  # With {{{}dynamic.enabled=false{}}}, behavior matches existing static XML 
> partitioning; {{GET /partition-plan}} returns 503
>  # With {{{}dynamic.enabled=true{}}}, registry topic created (1 partition, 
> compacted); bootstrap plan published on greenfield
>  # REST promote/scale updates plan version; stale {{expectedVersion}} returns 
> 409
>  # All ingestor replicas converge to same plan within watcher refresh interval
>  # Audits for promoted plugin land only on assigned {{ranger_audits}} 
> partitions
>  # {{POST /partition-plan/services}} updates allowlist; unauthorized 
> principal returns 403
>  # Post-scale routing works when Kafka metadata lags plan 
> ({{{}AuditPartitioner{}}} bound logic)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to