kumarpritam863 opened a new pull request, #15961:
URL: https://github.com/apache/iceberg/pull/15961

   ## Summary
   
     Adds a `ConnectorMetrics` facade that instruments the entire write and 
commit pipeline with 13 counters and gauges across three tiers: operational 
essentials, write path visibility, and schema/table management.
   
     Today, the connector has **zero metrics** — operators monitor production 
deployments by parsing logs. This PR places instrumentation at every critical 
point in the data path so that when the Kafka dependency
     moves to 4.0+ (KIP-KAFKA-15995 `PluginMetrics` API), all metrics 
automatically light up through the runtime's configured reporters (JMX, 
Prometheus, Datadog, etc.) with zero code changes.
   
     On Kafka 3.9.x (current), counters are tracked in-memory via `AtomicLong` 
and accessible programmatically (e.g., for integration tests or custom health 
checks). The `ConnectorMetrics.NOOP` singleton ensures
     zero overhead in tests.
   
     ### Metrics catalog
   
     #### Tier 1 — Operational Essentials
     | Metric | Type | Instrumentation Point |
     |--------|------|----------------------|
     | `records-received-total` | Counter | `SinkWriter.save()` — every batch 
received from `put()` |
     | `records-written-total` | Counter | `IcebergWriter.write()` — each 
record successfully written |
     | `records-dropped-total` | Counter | Reserved for routing drops 
(tombstones, empty route targets) |
     | `record-conversion-errors-total` | Counter | `IcebergWriter.write()` 
catch block — type mismatches, parse failures |
     | `commit-total` | Counter | `Coordinator.commit()` — every commit cycle 
initiated |
     | `commit-success-total` | Counter | `Coordinator.commit()` — successful 
`doCommit()` completion |
     | `commit-failure-total` | Counter | `Coordinator.commit()` — caught 
exception in commit cycle |
   
     #### Tier 2 — Write Path
     | Metric | Type | Instrumentation Point |
     |--------|------|----------------------|
     | `data-files-written-total` | Counter | `IcebergWriter.flush()` — from 
`WriteResult.dataFiles()` |
     | `delete-files-written-total` | Counter | `IcebergWriter.flush()` — from 
`WriteResult.deleteFiles()` |
     | `active-writers` | Gauge | `SinkWriter.writers` map size — registered 
via supplier |
   
     #### Tier 3 — Schema & Table Management
     | Metric | Type | Instrumentation Point |
     |--------|------|----------------------|
     | `schema-evolutions-total` | Counter | `IcebergWriter.convertToRow()` — 
schema update detected and applied |
     | `tables-auto-created-total` | Counter | 
`IcebergWriterFactory.autoCreateTable()` — after successful creation |
     | `commit-timeout-total` | Counter | `Coordinator.process()` — commit 
timed out waiting for workers |
   
     ### Design decisions
   
     - **`ConnectorMetrics` facade** — All instrumentation calls go through one 
class (`recordWritten(table)`, `commitStarted()`, etc.). This decouples metric 
recording from the metric backend. When `PluginMetrics`
     becomes available, only this class changes.
     - **`enabled` guard on all methods** — The `NOOP` singleton has 
`enabled=false`, making every method a branch-predicted no-op. Enabled 
instances check once per call (branch on `volatile boolean`). No overhead
     when disabled.
     - **KIP-KAFKA-15995 ready** — `ConnectorMetrics.create()` contains the 
exact try-catch pattern the KIP recommends (`catch NoSuchMethodError | 
NoClassDefFoundError`). When Kafka 4.0+ is available, metrics
     register under `kafka.connect:type=plugins,connector=<name>,task=<id>` 
automatically.
     - **No new connector config** — The KIP handles all metric infrastructure 
(reporters, JMX registration, cleanup). No `iceberg.metrics.*` properties 
needed.
     - **`tableName` parameter on per-table methods** — Even though 
table-scoped sensors aren't wired yet, the parameter is accepted now so call 
sites don't need to change when `PluginMetrics` adds per-table tag
     support.
     - **Duration tracking accepted but not stored** — 
`commitSucceeded(durationMs)` and `flushCompleted(tableName, durationMs)` 
accept duration for future histogram/avg wiring.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to